Compare commits

..

13 Commits

Author SHA1 Message Date
Naman Verma
d7d907f687 test: max parametrising of the unit tests 2026-03-10 16:31:25 +05:30
Naman Verma
76b4549504 test: unit tests 2026-03-10 16:16:53 +05:30
Naman Verma
968a5089ff fix: check for tic when finding remaining keys 2026-03-10 16:16:29 +05:30
Naman Verma
c082bc3d76 chore: also sort by remaining group by keys 2026-03-10 15:43:45 +05:30
Naman Verma
59e0dcc865 chore: move order by ts asc out of all if branches 2026-03-10 15:40:26 +05:30
Naman Verma
89840189ef chore: remove comment 2026-03-10 15:16:09 +05:30
Naman Verma
b64a07db02 fix: limit in where subclause was coming as ? 2026-03-10 15:15:46 +05:30
Naman Verma
38d971b3c9 fix: add partition window when ordering by sum 2026-03-10 15:06:29 +05:30
Naman Verma
f8b266ce05 chore: first draft before testing 2026-03-10 14:55:38 +05:30
Naman Verma
20f7562cbc revert: wrong changes in stmt builder (vv wrong) 2026-03-10 13:36:33 +05:30
Naman Verma
29713964ce Merge branch 'main' into nv/6204 2026-03-10 13:33:26 +05:30
Naman Verma
afb252b4f9 Merge branch 'main' into nv/6204 2026-03-06 08:26:09 +05:30
Naman Verma
c808b4d759 fix: consume order by and limit for metrics in clickhouse query 2026-03-05 09:29:23 +05:30
24 changed files with 492 additions and 610 deletions

2
go.mod
View File

@@ -11,6 +11,7 @@ require (
github.com/SigNoz/signoz-otel-collector v0.144.2
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/bytedance/sonic v1.14.1
github.com/cespare/xxhash/v2 v2.3.0
github.com/coreos/go-oidc/v3 v3.17.0
github.com/dgraph-io/ristretto/v2 v2.3.0
@@ -105,7 +106,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
github.com/aws/smithy-go v1.24.0 // indirect
github.com/bytedance/gopkg v0.1.3 // indirect
github.com/bytedance/sonic v1.14.1 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect

View File

@@ -78,7 +78,7 @@ func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetype
// add the paths that are not promoted but have indexes
for path, indexes := range aggr {
path := strings.TrimPrefix(path, telemetrylogs.BodyV2ColumnPrefix)
path := strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
path = telemetrytypes.BodyJSONStringSearchPrefix + path
response = append(response, promotetypes.PromotePath{
Path: path,
@@ -163,7 +163,7 @@ func (m *module) PromoteAndIndexPaths(
}
}
if len(it.Indexes) > 0 {
parentColumn := telemetrylogs.LogsV2BodyV2Column
parentColumn := telemetrylogs.LogsV2BodyJSONColumn
// if the path is already promoted or is being promoted, add it to the promoted column
if _, promoted := existingPromotedPaths[it.Path]; promoted || it.Promote {
parentColumn = telemetrylogs.LogsV2BodyPromotedColumn

View File

@@ -10,11 +10,13 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/bytedance/sonic"
)
type builderQuery[T any] struct {
@@ -260,6 +262,40 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string,
return nil, err
}
// merge body_json and promoted into body
if q.spec.Signal == telemetrytypes.SignalLogs {
switch typedPayload := payload.(type) {
case *qbtypes.RawData:
for _, rr := range typedPayload.Rows {
seeder := func() error {
body, ok := rr.Data[telemetrylogs.LogsV2BodyJSONColumn].(map[string]any)
if !ok {
return nil
}
promoted, ok := rr.Data[telemetrylogs.LogsV2BodyPromotedColumn].(map[string]any)
if !ok {
return nil
}
seed(promoted, body)
str, err := sonic.MarshalString(body)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal body")
}
rr.Data["body"] = str
return nil
}
err := seeder()
if err != nil {
return nil, err
}
delete(rr.Data, telemetrylogs.LogsV2BodyJSONColumn)
delete(rr.Data, telemetrylogs.LogsV2BodyPromotedColumn)
}
payload = typedPayload
}
}
return &qbtypes.Result{
Type: q.kind,
Value: payload,
@@ -387,3 +423,18 @@ func decodeCursor(cur string) (int64, error) {
}
return strconv.ParseInt(string(b), 10, 64)
}
func seed(promoted map[string]any, body map[string]any) {
for key, fromValue := range promoted {
if toValue, ok := body[key]; !ok {
body[key] = fromValue
} else {
if fromValue, ok := fromValue.(map[string]any); ok {
if toValue, ok := toValue.(map[string]any); ok {
seed(fromValue, toValue)
body[key] = toValue
}
}
}
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/bytedance/sonic"
)
var (
@@ -393,11 +394,17 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
// de-reference the typed pointer to any
val := reflect.ValueOf(cellPtr).Elem().Interface()
// Post-process JSON columns: normalize into String value
// Post-process JSON columns: normalize into structured values
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
switch x := val.(type) {
case []byte:
val = string(x)
if len(x) > 0 {
var v any
if err := sonic.Unmarshal(x, &v); err == nil {
val = v
}
}
default:
// already a structured type (map[string]any, []any, etc.)
}

View File

@@ -185,22 +185,6 @@ func postProcessMetricQuery(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
config := query.Aggregations[0]
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
timeSpaceAggOrderBy := fmt.Sprintf("%s(%s(%s))", config.SpaceAggregation.StringValue(), config.TimeAggregation.StringValue(), config.MetricName)
for idx := range query.Order {
if query.Order[idx].Key.Name == spaceAggOrderBy ||
query.Order[idx].Key.Name == timeAggOrderBy ||
query.Order[idx].Key.Name == timeSpaceAggOrderBy {
query.Order[idx].Key.Name = qbtypes.DefaultOrderByKey
}
}
result = q.applySeriesLimit(result, query.Limit, query.Order)
if len(query.Functions) > 0 {
step := query.StepInterval.Duration.Milliseconds()
functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step)

View File

@@ -132,6 +132,14 @@ func GroupByKeys(keys []qbtypes.GroupByKey) []string {
return k
}
func OrderByKeys(keys []qbtypes.OrderBy) []string {
k := []string{}
for _, key := range keys {
k = append(k, "`"+key.Key.Name+"`")
}
return k
}
func FormatValueForContains(value any) string {
if value == nil {
return ""
@@ -204,10 +212,7 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
// While we expect user not to send the mixed data types, it inevitably happens
// So we handle the data type collisions here
switch key.FieldDataType {
case telemetrytypes.FieldDataTypeString, telemetrytypes.FieldDataTypeArrayString, telemetrytypes.FieldDataTypeJSON:
if key.FieldDataType == telemetrytypes.FieldDataTypeJSON {
tblFieldName = fmt.Sprintf("toString(%s)", tblFieldName)
}
case telemetrytypes.FieldDataTypeString, telemetrytypes.FieldDataTypeArrayString:
switch v := value.(type) {
case float64:
// try to convert the string value to to number
@@ -222,6 +227,7 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
// we don't have a toBoolOrNull in ClickHouse, so we need to convert the bool to a string
value = fmt.Sprintf("%t", v)
}
case telemetrytypes.FieldDataTypeInt64,
telemetrytypes.FieldDataTypeArrayInt64,
telemetrytypes.FieldDataTypeNumber,

View File

@@ -313,30 +313,37 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
return ""
}
child := ctx.GetChild(0)
var searchText string
if keyCtx, ok := child.(*grammar.KeyContext); ok {
// create a full text search condition on the body field
searchText = keyCtx.GetText()
keyText := keyCtx.GetText()
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(keyText), v.builder, v.startNs, v.endNs)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
} else if valCtx, ok := child.(*grammar.ValueContext); ok {
var text string
if valCtx.QUOTED_TEXT() != nil {
searchText = trimQuotes(valCtx.QUOTED_TEXT().GetText())
text = trimQuotes(valCtx.QUOTED_TEXT().GetText())
} else if valCtx.NUMBER() != nil {
searchText = valCtx.NUMBER().GetText()
text = valCtx.NUMBER().GetText()
} else if valCtx.BOOL() != nil {
searchText = valCtx.BOOL().GetText()
text = valCtx.BOOL().GetText()
} else if valCtx.KEY() != nil {
searchText = valCtx.KEY().GetText()
text = valCtx.KEY().GetText()
} else {
v.errors = append(v.errors, fmt.Sprintf("unsupported value type: %s", valCtx.GetText()))
return ""
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(text), v.builder, v.startNs, v.endNs)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(searchText), v.builder, v.startNs, v.endNs)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
}
return "" // Should not happen with valid input
@@ -376,7 +383,6 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
for _, key := range keys {
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, nil, v.builder, v.startNs, v.endNs)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to build condition: %s", err.Error()))
return ""
}
conds = append(conds, condition)
@@ -642,6 +648,7 @@ func (v *filterExpressionVisitor) VisitValueList(ctx *grammar.ValueListContext)
// VisitFullText handles standalone quoted strings for full-text search
func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) any {
if v.skipFullTextFilter {
return ""
}
@@ -663,7 +670,6 @@ func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) an
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
}

View File

@@ -3,12 +3,14 @@ package telemetrylogs
import (
"context"
"fmt"
"slices"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"golang.org/x/exp/maps"
"github.com/huandu/go-sqlbuilder"
)
@@ -33,7 +35,7 @@ func (c *conditionBuilder) conditionFor(
return "", err
}
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && key.FieldContext != telemetrytypes.FieldContextLog {
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled {
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
if err != nil {
@@ -106,6 +108,7 @@ func (c *conditionBuilder) conditionFor(
return sb.ILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorRegexp:
// Note: Escape $$ to $$$$ to avoid sqlbuilder interpreting materialized $ signs
// Only needed because we are using sprintf instead of sb.Match (not implemented in sqlbuilder)
@@ -173,16 +176,9 @@ func (c *conditionBuilder) conditionFor(
var value any
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
switch key.FieldDataType {
case telemetrytypes.FieldDataTypeJSON:
if operator == qbtypes.FilterOperatorExists {
return sb.EQ(fmt.Sprintf("empty(%s)", tblFieldName), false), nil
}
return sb.EQ(fmt.Sprintf("empty(%s)", tblFieldName), true), nil
default:
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(tblFieldName), nil
}
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(tblFieldName), nil
} else {
return sb.IsNull(tblFieldName), nil
}
case schema.ColumnTypeEnumLowCardinality:
@@ -251,32 +247,19 @@ func (c *conditionBuilder) ConditionFor(
return "", err
}
// Skip adding exists filter for intrinsic fields i.e. Table level log context fields
buildExistCondition := operator.AddDefaultExistsFilter()
switch key.FieldContext {
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextScope:
// pass; No need to build exist condition for top level columns
// immidiately return
return condition, nil
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute:
buildExistCondition = buildExistCondition && true
case telemetrytypes.FieldContextBody:
// Querying JSON fields already account for Nullability of fields
// so additional exists checks are not needed
if querybuilder.BodyJSONQueryEnabled {
if !(key.FieldContext == telemetrytypes.FieldContextBody && querybuilder.BodyJSONQueryEnabled) && operator.AddDefaultExistsFilter() {
// skip adding exists filter for intrinsic fields
// with an exception for body json search
field, _ := c.fm.FieldFor(ctx, key)
if slices.Contains(maps.Keys(IntrinsicFields), field) && key.FieldContext != telemetrytypes.FieldContextBody {
return condition, nil
}
buildExistCondition = buildExistCondition && true
}
if buildExistCondition {
existsCondition, err := c.conditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb)
if err != nil {
return "", err
}
return sb.And(condition, existsCondition), nil
}
return condition, nil
}

View File

@@ -127,8 +127,7 @@ func TestConditionFor(t *testing.T) {
{
name: "Contains operator - body",
key: telemetrytypes.TelemetryFieldKey{
Name: "body",
FieldContext: telemetrytypes.FieldContextLog,
Name: "body",
},
operator: qbtypes.FilterOperatorContains,
value: 521509198310,

View File

@@ -1,10 +1,7 @@
package telemetrylogs
import (
"fmt"
"github.com/SigNoz/signoz-otel-collector/constants"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -20,7 +17,7 @@ const (
LogsV2TimestampColumn = "timestamp"
LogsV2ObservedTimestampColumn = "observed_timestamp"
LogsV2BodyColumn = "body"
LogsV2BodyV2Column = constants.BodyV2Column
LogsV2BodyJSONColumn = constants.BodyV2Column
LogsV2BodyPromotedColumn = constants.BodyPromotedColumn
LogsV2TraceIDColumn = "trace_id"
LogsV2SpanIDColumn = "span_id"
@@ -37,23 +34,11 @@ const (
LogsV2ResourcesStringColumn = "resources_string"
LogsV2ScopeStringColumn = "scope_string"
BodyV2ColumnPrefix = constants.BodyV2ColumnPrefix
BodyJSONColumnPrefix = constants.BodyV2ColumnPrefix
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
MessageSubColumn = "body_v2.message"
bodySearchDefaultWarning = "body searches default to `body.message:string`. Use `body.<key>` to search a different field inside body"
)
var (
// mapping for body logical field to message sub column
// Here field context is log since message is a type hint and
// in a sense it is a direct column of log table and doesn't need any lambda expressions
// TODO(Piyush): Add description for detailed explanation of remapping of body to message
BodyLogicalFieldJSONMapping = &telemetrytypes.TelemetryFieldKey{
Name: MessageSubColumn,
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
}
DefaultFullTextColumn = &telemetrytypes.TelemetryFieldKey{
Name: "body",
Signal: telemetrytypes.SignalLogs,
@@ -133,23 +118,3 @@ var (
},
}
)
func bodyAliasExpression() string {
if !querybuilder.BodyJSONQueryEnabled {
return LogsV2BodyColumn
}
return fmt.Sprintf("%s as body", LogsV2BodyV2Column)
}
func enrichMapsForJSONBodyEnabled() {
if querybuilder.BodyJSONQueryEnabled {
DefaultFullTextColumn = BodyLogicalFieldJSONMapping
IntrinsicFields["body"] = *BodyLogicalFieldJSONMapping
IntrinsicFields[MessageSubColumn] = *BodyLogicalFieldJSONMapping
}
}
func init() {
enrichMapsForJSONBodyEnabled()
}

View File

@@ -30,7 +30,7 @@ var (
"severity_text": {Name: "severity_text", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"severity_number": {Name: "severity_number", Type: schema.ColumnTypeUInt8},
"body": {Name: "body", Type: schema.ColumnTypeString},
LogsV2BodyV2Column: {Name: LogsV2BodyV2Column, Type: schema.JSONColumnType{
LogsV2BodyJSONColumn: {Name: LogsV2BodyJSONColumn, Type: schema.JSONColumnType{
MaxDynamicTypes: utils.ToPointer(uint(32)),
MaxDynamicPaths: utils.ToPointer(uint(0)),
}},
@@ -58,7 +58,6 @@ var (
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}},
MessageSubColumn: {Name: MessageSubColumn, Type: schema.ColumnTypeString},
}
)
@@ -90,9 +89,9 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
}
case telemetrytypes.FieldContextBody:
// Body context is for JSON body fields
// Use body_v2 if feature flag is enabled
// Use body_json if feature flag is enabled
if querybuilder.BodyJSONQueryEnabled {
return logsV2Columns[LogsV2BodyV2Column], nil
return logsV2Columns[LogsV2BodyJSONColumn], nil
}
// Fall back to legacy body column
return logsV2Columns["body"], nil
@@ -101,9 +100,9 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
if !ok {
// check if the key has body JSON search
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
// Use body_v2 if feature flag is enabled and we have a body condition builder
// Use body_json if feature flag is enabled and we have a body condition builder
if querybuilder.BodyJSONQueryEnabled {
return logsV2Columns[LogsV2BodyV2Column], nil
return logsV2Columns[LogsV2BodyJSONColumn], nil
}
// Fall back to legacy body column
return logsV2Columns["body"], nil
@@ -148,8 +147,6 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
}
return m.buildFieldForJSON(key)
case telemetrytypes.FieldContextLog:
return column.Name, nil
default:
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
}

View File

@@ -30,7 +30,7 @@ func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType te
return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType]}
}
// BuildCondition builds the full WHERE condition for body_v2 JSON paths
// BuildCondition builds the full WHERE condition for body_json JSON paths
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
conditions := []string{}
for _, node := range c.key.JSONPlan {
@@ -40,7 +40,6 @@ func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperato
}
conditions = append(conditions, condition)
}
return sb.Or(conditions...), nil
}
@@ -289,9 +288,9 @@ func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, field
}
return sb.NotIn(fieldExpr, values...), nil
case qbtypes.FilterOperatorExists:
return sb.IsNotNull(fieldExpr), nil
return fmt.Sprintf("%s IS NOT NULL", fieldExpr), nil
case qbtypes.FilterOperatorNotExists:
return sb.IsNull(fieldExpr), nil
return fmt.Sprintf("%s IS NULL", fieldExpr), nil
// between and not between
case qbtypes.FilterOperatorBetween, qbtypes.FilterOperatorNotBetween:
values, ok := value.([]any)

File diff suppressed because one or more lines are too long

View File

@@ -65,7 +65,7 @@ func (b *logQueryStatementBuilder) Build(
start = querybuilder.ToNanoSecs(start)
end = querybuilder.ToNanoSecs(end)
keySelectors, warnings := getKeySelectors(query)
keySelectors := getKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
@@ -76,32 +76,20 @@ func (b *logQueryStatementBuilder) Build(
// Create SQL builder
q := sqlbuilder.NewSelectBuilder()
var stmt *qbtypes.Statement
switch requestType {
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
stmt, err = b.buildListQuery(ctx, q, query, start, end, keys, variables)
return b.buildListQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeTimeSeries:
stmt, err = b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeScalar:
stmt, err = b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
return b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
}
if err != nil {
return nil, err
}
if stmt != nil && len(warnings) > 0 {
stmt.Warnings = append(stmt.Warnings, warnings...)
}
return stmt, nil
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
}
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([]*telemetrytypes.FieldKeySelector, []string) {
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector
var warnings []string
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
@@ -148,19 +136,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
}
// When the new JSON body experience is enabled, warn the user if they use the bare
// "body" key in the filter — queries on plain "body" default to body.message:string.
// TODO(Piyush): Setup better for coming FTS support.
if querybuilder.BodyJSONQueryEnabled {
for _, sel := range keySelectors {
if sel.Name == LogsV2BodyColumn {
warnings = append(warnings, bodySearchDefaultWarning)
break
}
}
}
return keySelectors, warnings
return keySelectors
}
func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
@@ -227,6 +203,7 @@ func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[stri
}
func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
// First check if it matches with any intrinsic fields
var intrinsicOrCalculatedField telemetrytypes.TelemetryFieldKey
if _, ok := IntrinsicFields[key.Name]; ok {
@@ -235,6 +212,7 @@ func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldK
}
return querybuilder.AdjustKey(key, keys, nil)
}
// buildListQuery builds a query for list panel type
@@ -271,7 +249,11 @@ func (b *logQueryStatementBuilder) buildListQuery(
sb.SelectMore(LogsV2SeverityNumberColumn)
sb.SelectMore(LogsV2ScopeNameColumn)
sb.SelectMore(LogsV2ScopeVersionColumn)
sb.SelectMore(bodyAliasExpression())
sb.SelectMore(LogsV2BodyColumn)
if querybuilder.BodyJSONQueryEnabled {
sb.SelectMore(LogsV2BodyJSONColumn)
sb.SelectMore(LogsV2BodyPromotedColumn)
}
sb.SelectMore(LogsV2AttributesStringColumn)
sb.SelectMore(LogsV2AttributesNumberColumn)
sb.SelectMore(LogsV2AttributesBoolColumn)

View File

@@ -5,7 +5,6 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
@@ -887,154 +886,3 @@ func TestAdjustKey(t *testing.T) {
})
}
}
func TestStmtBuilderBodyField(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
enableBodyJSONQuery bool
expected qbtypes.Statement
expectedErr error
}{
{
name: "body_exists",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body Exists"},
Limit: 10,
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},
expectedErr: nil,
},
{
name: "body_exists_disabled",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body Exists"},
Limit: 10,
},
enableBodyJSONQuery: false,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "body_empty",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body == ''"},
Limit: 10,
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},
expectedErr: nil,
},
{
name: "body_empty_disabled",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body == ''"},
Limit: 10,
},
enableBodyJSONQuery: false,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "body_contains",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body CONTAINS 'error'"},
Limit: 10,
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body_v2.message) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "%error%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},
expectedErr: nil,
},
{
name: "body_contains_disabled",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body CONTAINS 'error'"},
Limit: 10,
},
enableBodyJSONQuery: false,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "%error%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
enable, disable := jsonQueryTestUtil(t)
defer disable()
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if c.enableBodyJSONQuery {
enable()
} else {
disable()
}
// build the key map after enabling/disabling body JSON query
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
)
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
if err != nil {
_, _, _, _, _, add := errors.Unwrapb(err)
t.Logf("error additionals: %v", add)
}
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}

View File

@@ -27,6 +27,13 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"body": {
{
Name: "body",
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"http.status_code": {
{
Name: "http.status_code",
@@ -938,11 +945,6 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
key.Signal = telemetrytypes.SignalLogs
}
}
// add intrinsic fields to the map
for fieldName, key := range IntrinsicFields {
keysMap[fieldName] = append(keysMap[fieldName], &key)
}
return keysMap
}

View File

@@ -319,7 +319,7 @@ func (t *telemetryMetaStore) ListJSONValues(ctx context.Context, path string, li
if promoted {
path = telemetrylogs.BodyPromotedColumnPrefix + path
} else {
path = telemetrylogs.BodyV2ColumnPrefix + path
path = telemetrylogs.BodyJSONColumnPrefix + path
}
from := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
@@ -522,7 +522,7 @@ func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...stri
// TODO(Piyush): Remove this function
func CleanPathPrefixes(path string) string {
path = strings.TrimPrefix(path, telemetrytypes.BodyJSONStringSearchPrefix)
path = strings.TrimPrefix(path, telemetrylogs.BodyV2ColumnPrefix)
path = strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
path = strings.TrimPrefix(path, telemetrylogs.BodyPromotedColumnPrefix)
return path
}

View File

@@ -102,7 +102,7 @@ func NewTelemetryMetaStore(
jsonColumnMetadata: map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata{
telemetrytypes.SignalLogs: {
telemetrytypes.FieldContextBody: telemetrytypes.JSONColumnMetadata{
BaseColumn: telemetrylogs.LogsV2BodyV2Column,
BaseColumn: telemetrylogs.LogsV2BodyJSONColumn,
PromotedColumn: telemetrylogs.LogsV2BodyPromotedColumn,
},
},
@@ -351,7 +351,7 @@ func (t *telemetryMetaStore) logsTblStatementToFieldKeys(ctx context.Context) ([
}
// getLogsKeys returns the keys from the spans that match the field selection criteria
func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) {
func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
instrumentationtypes.CodeNamespace: "metadata",
@@ -367,10 +367,9 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
if err != nil {
return nil, false, err
}
// setOfKeys to reuse the same key object for qualified names
setOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
mapOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
for _, key := range matKeys {
setOfKeys[key.Text()] = key
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
}
// queries for both attribute and resource keys tables
@@ -471,7 +470,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
if len(queries) == 0 {
// No matching contexts, return empty result
return nil, true, nil
return []*telemetrytypes.TelemetryFieldKey{}, true, nil
}
// Combine queries with UNION ALL
@@ -499,7 +498,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
}
defer rows.Close()
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
keys := []*telemetrytypes.TelemetryFieldKey{}
rowCount := 0
searchTexts := []string{}
dataTypes := []telemetrytypes.FieldDataType{}
@@ -527,7 +526,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
if err != nil {
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
key, ok := setOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()]
key, ok := mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()]
// if there is no materialised column, create a key with the field context and data type
if !ok {
@@ -539,8 +538,8 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
}
}
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
setOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
keys = append(keys, key)
mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
}
if rows.Err() != nil {
@@ -566,13 +565,17 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
if found {
if field, exists := telemetrylogs.IntrinsicFields[key]; exists {
if _, added := setOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
mapOfKeys[field.Name] = append(mapOfKeys[field.Name], &field)
// intrinsic field can be a logical field mapped to a different physical location
mapOfKeys[key] = append(mapOfKeys[key], &field)
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
keys = append(keys, &field)
}
continue
}
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: key,
FieldContext: telemetrytypes.FieldContextLog,
Signal: telemetrytypes.SignalLogs,
})
}
}
@@ -581,13 +584,10 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
if err != nil {
t.logger.ErrorContext(ctx, "failed to extract body JSON paths", "error", err)
}
for _, key := range bodyJSONPaths {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
keys = append(keys, bodyJSONPaths...)
complete = complete && finished
}
return mapOfKeys, complete, nil
return keys, complete, nil
}
func getPriorityForContext(ctx telemetrytypes.FieldContext) int {
@@ -882,20 +882,12 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
if fieldKeySelector != nil {
selectors = []*telemetrytypes.FieldKeySelector{fieldKeySelector}
}
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
switch fieldKeySelector.Signal {
case telemetrytypes.SignalTraces:
keys, complete, err = t.getTracesKeys(ctx, selectors)
case telemetrytypes.SignalLogs:
mapOfLogKeys, logsComplete, err := t.getLogsKeys(ctx, selectors)
if err != nil {
return nil, false, err
}
for keyName, keys := range mapOfLogKeys {
mapOfKeys[keyName] = append(mapOfKeys[keyName], keys...)
}
complete = complete && logsComplete
keys, complete, err = t.getLogsKeys(ctx, selectors)
case telemetrytypes.SignalMetrics:
if fieldKeySelector.Source == telemetrytypes.SourceMeter {
keys, complete, err = t.getMeterSourceMetricKeys(ctx, selectors)
@@ -911,13 +903,12 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
keys = append(keys, tracesKeys...)
// get logs keys
mapOfLogKeys, logsComplete, err := t.getLogsKeys(ctx, selectors)
logsKeys, logsComplete, err := t.getLogsKeys(ctx, selectors)
if err != nil {
return nil, false, err
}
for keyName, keys := range mapOfLogKeys {
mapOfKeys[keyName] = append(mapOfKeys[keyName], keys...)
}
keys = append(keys, logsKeys...)
// get metrics keys
metricsKeys, metricsComplete, err := t.getMetricsKeys(ctx, selectors)
if err != nil {
@@ -931,6 +922,7 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
return nil, false, err
}
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
for _, key := range keys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
@@ -967,7 +959,7 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
}
}
mapOfLogKeys, logsComplete, err := t.getLogsKeys(ctx, logsSelectors)
logsKeys, logsComplete, err := t.getLogsKeys(ctx, logsSelectors)
if err != nil {
return nil, false, err
}
@@ -988,8 +980,8 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
complete := logsComplete && tracesComplete && metricsComplete
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
for keyName, keys := range mapOfLogKeys {
mapOfKeys[keyName] = append(mapOfKeys[keyName], keys...)
for _, key := range logsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range tracesKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
@@ -607,8 +608,73 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
sb.Where(rewrittenExpr)
}
}
sb.OrderBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.OrderBy("ts")
groupByKeys := querybuilder.GroupByKeys(query.GroupBy)
hasOrder := len(query.Order) > 0
hasLimit := query.Limit > 0
hasGroupBy := len(groupByKeys) > 0
if hasOrder && hasLimit {
// order by with limit: add WHERE subquery to restrict to top N distinct key combinations
orderByKeys := querybuilder.OrderByKeys(query.Order)
var orderByClauses []string
for _, o := range query.Order {
orderByClauses = append(orderByClauses, fmt.Sprintf("`%s` %s", o.Key.Name, o.Direction.StringValue()))
}
subSb := sqlbuilder.NewSelectBuilder()
subSb.Select(fmt.Sprintf("DISTINCT %s", orderByKeys[0]))
if len(orderByKeys) > 1 {
subSb.SelectMore(orderByKeys[1:]...)
}
subSb.From("__spatial_aggregation_cte")
subSb.OrderBy(orderByClauses...)
subQ, _ := subSb.BuildWithFlavor(sqlbuilder.ClickHouse)
subQ = fmt.Sprintf("%s LIMIT %d", subQ, query.Limit)
sb.Where(fmt.Sprintf("(%s) IN (%s)", strings.Join(orderByKeys, ", "), subQ))
sb.OrderBy(orderByClauses...)
} else if hasOrder {
// order by without limit: apply order by clauses directly
for _, o := range query.Order {
key := o.Key.Name
if strings.Contains(key, query.Aggregations[0].MetricName) {
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) %s", strings.Join(groupByKeys, ", "), o.Direction.StringValue()))
continue
}
sb.OrderBy(fmt.Sprintf("`%s` %s", o.Key.Name, o.Direction.StringValue()))
}
} else if hasLimit && hasGroupBy {
// limit without order by: default ordering by avg(value)
subSb := sqlbuilder.NewSelectBuilder()
subSb.Select(groupByKeys[0])
if len(groupByKeys) > 1 {
subSb.SelectMore(groupByKeys[1:]...)
}
subSb.From("__spatial_aggregation_cte")
subSb.GroupBy(groupByKeys...)
subSb.OrderBy("avg(value)")
subQ, _ := subSb.BuildWithFlavor(sqlbuilder.ClickHouse)
subQ = fmt.Sprintf("%s LIMIT %d", subQ, query.Limit)
sb.Where(fmt.Sprintf("(%s) IN (%s)", strings.Join(groupByKeys, ", "), subQ))
} else if hasGroupBy {
// grouping without order by or limit: sort by avg(value) DESC with labels as tiebreakers
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) DESC", strings.Join(groupByKeys, ", ")))
}
// add any group-by keys not already in the order-by as tiebreakers
orderKeySet := make(map[string]struct{})
for _, o := range query.Order {
orderKeySet[fmt.Sprintf("`%s`", o.Key.Name)] = struct{}{}
}
for _, g := range groupByKeys {
if _, exists := orderKeySet[g]; !exists {
sb.OrderBy(g)
}
}
sb.OrderBy("ts ASC")
if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam == nil {
sb.OrderBy("toFloat64(le)")
}

View File

@@ -15,16 +15,17 @@ import (
)
func TestStatementBuilder(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expected qbtypes.Statement
expectedErr error
}{
type baseQuery struct {
name string
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
orderKey string
args []any
cte string
}
bases := []baseQuery{
{
name: "test_cumulative_rate_sum",
requestType: qbtypes.RequestTypeTimeSeries,
name: "cumulative_rate_sum",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -40,24 +41,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
},
{
name: "test_cumulative_rate_sum_with_mat_column",
requestType: qbtypes.RequestTypeTimeSeries,
name: "cumulative_rate_sum_with_mat_column",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -73,24 +66,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "materialized.key.name REGEXP 'cartservice' OR service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
},
{
name: "test_delta_rate_sum",
requestType: qbtypes.RequestTypeTimeSeries,
name: "delta_rate_sum",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -106,24 +91,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`)",
},
{
name: "test_histogram_percentile1",
requestType: qbtypes.RequestTypeTimeSeries,
name: "histogram_percentile1",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -139,24 +116,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`)",
},
{
name: "test_gauge_avg_sum",
requestType: qbtypes.RequestTypeTimeSeries,
name: "gauge_avg_sum",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -172,24 +141,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "host.name = 'big-data-node-1'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "host.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "host.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
},
expectedErr: nil,
orderKey: "host.name",
args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`)",
},
{
name: "test_histogram_percentile2",
requestType: qbtypes.RequestTypeTimeSeries,
name: "histogram_percentile2",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -202,23 +163,69 @@ func TestStatementBuilder(t *testing.T) {
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
},
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`)",
},
}
type variant struct {
name string
limit int
hasOrder bool
}
variants := []variant{
{"with_limits", 10, false},
{"without_limits", 0, false},
{"with_order_by", 0, true},
{"with_order_by_and_limits", 10, true},
}
// expectedFinalSelects maps "base/variant" to the final SELECT portion after the CTE.
// The full expected query is: base.cte + expectedFinalSelects[name]
expectedFinalSelects := map[string]string{
// cumulative_rate_sum
"cumulative_rate_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
"cumulative_rate_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"cumulative_rate_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
"cumulative_rate_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
// cumulative_rate_sum_with_mat_column
"cumulative_rate_sum_with_mat_column/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
"cumulative_rate_sum_with_mat_column/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"cumulative_rate_sum_with_mat_column/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
"cumulative_rate_sum_with_mat_column/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
// delta_rate_sum
"delta_rate_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
"delta_rate_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"delta_rate_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
"delta_rate_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
// histogram_percentile1
"histogram_percentile1/with_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name`, ts ASC",
"histogram_percentile1/without_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"histogram_percentile1/with_order_by": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
"histogram_percentile1/with_order_by_and_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
// gauge_avg_sum
"gauge_avg_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) LIMIT 10) ORDER BY `host.name`, ts ASC",
"gauge_avg_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
"gauge_avg_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name` asc, ts ASC",
"gauge_avg_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT DISTINCT `host.name` FROM __spatial_aggregation_cte ORDER BY `host.name` asc LIMIT 10) ORDER BY `host.name` asc, ts ASC",
// histogram_percentile2
"histogram_percentile2/with_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name`, ts ASC",
"histogram_percentile2/without_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"histogram_percentile2/with_order_by": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
"histogram_percentile2/with_order_by_and_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -227,15 +234,13 @@ func TestStatementBuilder(t *testing.T) {
t.Fatalf("failed to load field keys: %v", err)
}
mockMetadataStore.KeysMap = keys
// NOTE: LoadFieldKeysFromJSON doesn't set Materialized field
// for keys, so we have to set it manually here for testing
if _, ok := mockMetadataStore.KeysMap["materialized.key.name"]; ok {
if len(mockMetadataStore.KeysMap["materialized.key.name"]) > 0 {
mockMetadataStore.KeysMap["materialized.key.name"][0].Materialized = true
}
}
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
fl, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
if err != nil {
t.Fatalf("failed to create flagger: %v", err)
}
@@ -245,23 +250,30 @@ func TestStatementBuilder(t *testing.T) {
mockMetadataStore,
fm,
cb,
flagger,
fl,
)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
for _, b := range bases {
for _, v := range variants {
name := b.name + "/" + v.name
t.Run(name, func(t *testing.T) {
q := b.query
q.Limit = v.limit
if v.hasOrder {
q.Order = []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: b.orderKey}},
Direction: qbtypes.OrderDirectionAsc,
},
}
}
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
result, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, qbtypes.RequestTypeTimeSeries, q, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
require.Equal(t, b.cte+expectedFinalSelects[name], result.Query)
require.Equal(t, b.args, result.Args)
})
}
}
}

View File

@@ -21,7 +21,6 @@ var (
// int64 and number are synonyms for float64
FieldDataTypeInt64 = FieldDataType{valuer.NewString("int64")}
FieldDataTypeNumber = FieldDataType{valuer.NewString("number")}
FieldDataTypeJSON = FieldDataType{valuer.NewString("json")}
FieldDataTypeUnspecified = FieldDataType{valuer.NewString("")}
FieldDataTypeArrayString = FieldDataType{valuer.NewString("[]string")}

View File

@@ -40,7 +40,7 @@ type JSONAccessNode struct {
// Node information
Name string
IsTerminal bool
isRoot bool // marked true for only body_v2 and body_promoted
isRoot bool // marked true for only body_json and body_json_promoted
// Precomputed type information (single source of truth)
AvailableTypes []JSONDataType

View File

@@ -1,19 +1,12 @@
package telemetrytypes
import (
"fmt"
"testing"
otelconstants "github.com/SigNoz/signoz-otel-collector/constants"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
const (
bodyV2Column = otelconstants.BodyV2Column
bodyPromotedColumn = otelconstants.BodyPromotedColumn
)
// ============================================================================
// Helper Functions for Test Data Creation
// ============================================================================
@@ -116,8 +109,8 @@ func TestNode_Alias(t *testing.T) {
}{
{
name: "Root node returns name as-is",
node: NewRootJSONAccessNode(bodyV2Column, 32, 0),
expected: bodyV2Column,
node: NewRootJSONAccessNode("body_json", 32, 0),
expected: "body_json",
},
{
name: "Node without parent returns backticked name",
@@ -131,9 +124,9 @@ func TestNode_Alias(t *testing.T) {
name: "Node with root parent uses dot separator",
node: &JSONAccessNode{
Name: "age",
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
expected: "`" + bodyV2Column + ".age`",
expected: "`" + "body_json" + ".age`",
},
{
name: "Node with non-root parent uses array separator",
@@ -141,10 +134,10 @@ func TestNode_Alias(t *testing.T) {
Name: "name",
Parent: &JSONAccessNode{
Name: "education",
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
},
expected: "`" + bodyV2Column + ".education[].name`",
expected: "`" + "body_json" + ".education[].name`",
},
{
name: "Nested array path with multiple levels",
@@ -154,11 +147,11 @@ func TestNode_Alias(t *testing.T) {
Name: "awards",
Parent: &JSONAccessNode{
Name: "education",
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
},
},
expected: "`" + bodyV2Column + ".education[].awards[].type`",
expected: "`" + "body_json" + ".education[].awards[].type`",
},
}
@@ -180,18 +173,18 @@ func TestNode_FieldPath(t *testing.T) {
name: "Simple field path from root",
node: &JSONAccessNode{
Name: "user",
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
// FieldPath() always wraps the field name in backticks
expected: bodyV2Column + ".`user`",
expected: "body_json" + ".`user`",
},
{
name: "Field path with backtick-required key",
node: &JSONAccessNode{
Name: "user-name", // requires backtick
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
expected: bodyV2Column + ".`user-name`",
expected: "body_json" + ".`user-name`",
},
{
name: "Nested field path",
@@ -199,11 +192,11 @@ func TestNode_FieldPath(t *testing.T) {
Name: "age",
Parent: &JSONAccessNode{
Name: "user",
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
},
// FieldPath() always wraps the field name in backticks
expected: "`" + bodyV2Column + ".user`.`age`",
expected: "`" + "body_json" + ".user`.`age`",
},
{
name: "Array element field path",
@@ -211,11 +204,11 @@ func TestNode_FieldPath(t *testing.T) {
Name: "name",
Parent: &JSONAccessNode{
Name: "education",
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
},
// FieldPath() always wraps the field name in backticks
expected: "`" + bodyV2Column + ".education`.`name`",
expected: "`" + "body_json" + ".education`.`name`",
},
}
@@ -243,36 +236,36 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
{
name: "Simple path not promoted",
key: makeKey("user.name", String, false),
expectedYAML: fmt.Sprintf(`
expectedYAML: `
- name: user.name
column: %s
column: body_json
availableTypes:
- String
maxDynamicTypes: 16
isTerminal: true
elemType: String
`, bodyV2Column),
`,
},
{
name: "Simple path promoted",
key: makeKey("user.name", String, true),
expectedYAML: fmt.Sprintf(`
expectedYAML: `
- name: user.name
column: %s
column: body_json
availableTypes:
- String
maxDynamicTypes: 16
isTerminal: true
elemType: String
- name: user.name
column: %s
column: body_json_promoted
availableTypes:
- String
maxDynamicTypes: 16
maxDynamicPaths: 256
isTerminal: true
elemType: String
`, bodyV2Column, bodyPromotedColumn),
`,
},
{
name: "Empty path returns error",
@@ -285,8 +278,8 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
if tt.expectErr {
require.Error(t, err)
@@ -311,9 +304,9 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
{
name: "Single array level - JSON branch only",
path: "education[].name",
expectedYAML: fmt.Sprintf(`
expectedYAML: `
- name: education
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -325,14 +318,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
`, bodyV2Column),
`,
},
{
name: "Single array level - both JSON and Dynamic branches",
path: "education[].awards[].type",
expectedYAML: fmt.Sprintf(`
expectedYAML: `
- name: education
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -359,14 +352,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
`, bodyV2Column),
`,
},
{
name: "Deeply nested array path",
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
expectedYAML: fmt.Sprintf(`
expectedYAML: `
- name: interests
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -406,14 +399,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
- String
isTerminal: true
elemType: String
`, bodyV2Column),
`,
},
{
name: "ArrayAnyIndex replacement [*] to []",
path: "education[*].name",
expectedYAML: fmt.Sprintf(`
expectedYAML: `
- name: education
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -425,7 +418,7 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
`, bodyV2Column),
`,
},
}
@@ -433,8 +426,8 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
key := makeKey(tt.path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
require.NoError(t, err)
require.NotNil(t, key.JSONPlan)
@@ -452,15 +445,15 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
t.Run("Non-promoted plan", func(t *testing.T) {
key := makeKey(path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 1)
expectedYAML := fmt.Sprintf(`
expectedYAML := `
- name: education
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -487,7 +480,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
`, bodyV2Column)
`
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)
})
@@ -495,15 +488,15 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
t.Run("Promoted plan", func(t *testing.T) {
key := makeKey(path, String, true)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 2)
expectedYAML := fmt.Sprintf(`
expectedYAML := `
- name: education
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -531,7 +524,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
isTerminal: true
elemType: String
- name: education
column: %s
column: body_json_promoted
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -561,7 +554,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
`, bodyV2Column, bodyPromotedColumn)
`
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)
})
@@ -582,11 +575,11 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
expectErr: true,
},
{
name: "Very deep nesting - validates progression doesn't go negative",
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
expectedYAML: fmt.Sprintf(`
name: "Very deep nesting - validates progression doesn't go negative",
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
expectedYAML: `
- name: interests
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -626,14 +619,14 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
- String
isTerminal: true
elemType: String
`, bodyV2Column),
`,
},
{
name: "Path with mixed scalar and array types",
path: "education[].type",
expectedYAML: fmt.Sprintf(`
name: "Path with mixed scalar and array types",
path: "education[].type",
expectedYAML: `
- name: education
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -646,20 +639,20 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
`, bodyV2Column),
`,
},
{
name: "Exists with only array types available",
path: "education",
expectedYAML: fmt.Sprintf(`
name: "Exists with only array types available",
path: "education",
expectedYAML: `
- name: education
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
isTerminal: true
elemType: Array(JSON)
`, bodyV2Column),
`,
},
}
@@ -675,8 +668,8 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
}
key := makeKey(tt.path, keyType, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
if tt.expectErr {
require.Error(t, err)
@@ -694,15 +687,15 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
path := "education[].awards[].participated[].team[].branch"
key := makeKey(path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 1)
expectedYAML := fmt.Sprintf(`
expectedYAML := `
- name: education
column: %s
column: body_json
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -787,7 +780,7 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
maxDynamicPaths: 64
isTerminal: true
elemType: String
`, bodyV2Column)
`
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)

View File

@@ -2,7 +2,6 @@ package telemetrytypestest
import (
"context"
"slices"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
@@ -179,9 +178,8 @@ func matchesKey(selector *telemetrytypes.FieldKeySelector, key *telemetrytypes.T
return true
}
matchNameExceptions := []string{"body"}
// Check name (already checked in matchesName, but double-check here)
if selector.Name != "" && !matchesName(selector, key.Name) && slices.Contains(matchNameExceptions, key.Name) {
if selector.Name != "" && !matchesName(selector, key.Name) {
return false
}