mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-07 02:12:11 +00:00
Compare commits
61 Commits
test/uplot
...
issue_3017
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec5738032d | ||
|
|
4352c4de91 | ||
|
|
6acbc7156d | ||
|
|
09f9a2d4f2 | ||
|
|
6a5354df39 | ||
|
|
ca9ff25314 | ||
|
|
07e66e8c24 | ||
|
|
6283c6c26a | ||
|
|
3515e59a39 | ||
|
|
7756067914 | ||
|
|
d3ef59cba7 | ||
|
|
81e33d59bb | ||
|
|
a05957dc69 | ||
|
|
24cf357b04 | ||
|
|
91e4da28e6 | ||
|
|
4cc727b7f8 | ||
|
|
9b24097a61 | ||
|
|
3a5d6b4493 | ||
|
|
d341f1f810 | ||
|
|
df1b47230a | ||
|
|
6261c9586f | ||
|
|
cda48874d2 | ||
|
|
277b6de266 | ||
|
|
6f87ebe092 | ||
|
|
62c70715e0 | ||
|
|
585a2b5282 | ||
|
|
6ad4c8ad8e | ||
|
|
68df57965d | ||
|
|
d155cc6a10 | ||
|
|
90a6902093 | ||
|
|
2bf92c9c2f | ||
|
|
aa2c1676b6 | ||
|
|
239c0f4e2e | ||
|
|
97ecfdea23 | ||
|
|
6a02db8685 | ||
|
|
9f85dfb307 | ||
|
|
ebc236857d | ||
|
|
0a1e252bb5 | ||
|
|
dd696bab13 | ||
|
|
7f87103b30 | ||
|
|
726bd0ea7a | ||
|
|
ab443c2d65 | ||
|
|
8be9a79d56 | ||
|
|
471ad88971 | ||
|
|
a5c46beeec | ||
|
|
41f720950d | ||
|
|
d9bce4a3c6 | ||
|
|
a5ac40c33c | ||
|
|
86b1366d4a | ||
|
|
eddb43a901 | ||
|
|
505cfe2314 | ||
|
|
6e54ee822a | ||
|
|
d88cb8aba4 | ||
|
|
b823b2a1e1 | ||
|
|
7cfb7118a3 | ||
|
|
59dfe7c0ed | ||
|
|
96b68b91c9 | ||
|
|
be6ce8d4f1 | ||
|
|
1fc58695c6 | ||
|
|
43450a187e | ||
|
|
f4666d9c97 |
@@ -44,6 +44,7 @@ func NewAPI(
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
telemetrymetadata.ColumnEvolutionMetadataTableName,
|
||||
)
|
||||
|
||||
return &API{
|
||||
|
||||
@@ -610,18 +610,18 @@ func (m *module) buildFilterClause(ctx context.Context, filter *qbtypes.Filter,
|
||||
}
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: m.logger,
|
||||
FieldMapper: m.fieldMapper,
|
||||
ConditionBuilder: m.condBuilder,
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "labels"},
|
||||
FieldKeys: keys,
|
||||
StartNs: querybuilder.ToNanoSecs(uint64(startMillis)),
|
||||
EndNs: querybuilder.ToNanoSecs(uint64(endMillis)),
|
||||
}
|
||||
|
||||
startNs := querybuilder.ToNanoSecs(uint64(startMillis))
|
||||
endNs := querybuilder.ToNanoSecs(uint64(endMillis))
|
||||
|
||||
whereClause, err := querybuilder.PrepareWhereClause(expression, opts, startNs, endNs)
|
||||
whereClause, err := querybuilder.PrepareWhereClause(expression, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -66,6 +66,7 @@ func newProvider(
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
telemetrymetadata.ColumnEvolutionMetadataTableName,
|
||||
)
|
||||
|
||||
// Create trace statement builder
|
||||
|
||||
@@ -48,6 +48,8 @@ func NewAggExprRewriter(
|
||||
// and the args if the parametric aggregation function is used.
|
||||
func (r *aggExprRewriter) Rewrite(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
expr string,
|
||||
rateInterval uint64,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
@@ -74,7 +76,12 @@ func (r *aggExprRewriter) Rewrite(
|
||||
return "", nil, errors.NewInternalf(errors.CodeInternal, "no SELECT items for %q", expr)
|
||||
}
|
||||
|
||||
visitor := newExprVisitor(r.logger, keys,
|
||||
visitor := newExprVisitor(
|
||||
ctx,
|
||||
startNs,
|
||||
endNs,
|
||||
r.logger,
|
||||
keys,
|
||||
r.fullTextColumn,
|
||||
r.fieldMapper,
|
||||
r.conditionBuilder,
|
||||
@@ -94,6 +101,8 @@ func (r *aggExprRewriter) Rewrite(
|
||||
// RewriteMulti rewrites a slice of expressions.
|
||||
func (r *aggExprRewriter) RewriteMulti(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
exprs []string,
|
||||
rateInterval uint64,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
@@ -102,7 +111,7 @@ func (r *aggExprRewriter) RewriteMulti(
|
||||
var errs []error
|
||||
var chArgsList [][]any
|
||||
for i, e := range exprs {
|
||||
w, chArgs, err := r.Rewrite(ctx, e, rateInterval, keys)
|
||||
w, chArgs, err := r.Rewrite(ctx, startNs, endNs, e, rateInterval, keys)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
out[i] = e
|
||||
@@ -119,6 +128,9 @@ func (r *aggExprRewriter) RewriteMulti(
|
||||
|
||||
// exprVisitor walks FunctionExpr nodes and applies the mappers.
|
||||
type exprVisitor struct {
|
||||
ctx context.Context
|
||||
startNs uint64
|
||||
endNs uint64
|
||||
chparser.DefaultASTVisitor
|
||||
logger *slog.Logger
|
||||
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
@@ -132,6 +144,9 @@ type exprVisitor struct {
|
||||
}
|
||||
|
||||
func newExprVisitor(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
logger *slog.Logger,
|
||||
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey,
|
||||
@@ -140,6 +155,9 @@ func newExprVisitor(
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
|
||||
) *exprVisitor {
|
||||
return &exprVisitor{
|
||||
ctx: ctx,
|
||||
startNs: startNs,
|
||||
endNs: endNs,
|
||||
logger: logger,
|
||||
fieldKeys: fieldKeys,
|
||||
fullTextColumn: fullTextColumn,
|
||||
@@ -186,13 +204,16 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
|
||||
whereClause, err := PrepareWhereClause(
|
||||
origPred,
|
||||
FilterExprVisitorOpts{
|
||||
Context: v.ctx,
|
||||
Logger: v.logger,
|
||||
FieldKeys: v.fieldKeys,
|
||||
FieldMapper: v.fieldMapper,
|
||||
ConditionBuilder: v.conditionBuilder,
|
||||
FullTextColumn: v.fullTextColumn,
|
||||
JsonKeyToKey: v.jsonKeyToKey,
|
||||
}, 0, 0,
|
||||
StartNs: v.startNs,
|
||||
EndNs: v.endNs,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -212,7 +233,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
|
||||
for i := 0; i < len(args)-1; i++ {
|
||||
origVal := args[i].String()
|
||||
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(origVal)
|
||||
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey)
|
||||
expr, exprArgs, err := CollisionHandledFinalExpr(v.ctx, v.startNs, v.endNs, &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey)
|
||||
if err != nil {
|
||||
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to get table field name for %q", origVal)
|
||||
}
|
||||
@@ -230,7 +251,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
|
||||
for i, arg := range args {
|
||||
orig := arg.String()
|
||||
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(orig)
|
||||
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey)
|
||||
expr, exprArgs, err := CollisionHandledFinalExpr(v.ctx, v.startNs, v.endNs, &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -153,6 +153,7 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet
|
||||
key.Indexes = intrinsicOrCalculatedField.Indexes
|
||||
key.Materialized = intrinsicOrCalculatedField.Materialized
|
||||
key.JSONPlan = intrinsicOrCalculatedField.JSONPlan
|
||||
key.Evolutions = intrinsicOrCalculatedField.Evolutions
|
||||
return actions
|
||||
|
||||
}
|
||||
@@ -205,7 +206,8 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet
|
||||
key.Indexes = matchingKey.Indexes
|
||||
key.Materialized = matchingKey.Materialized
|
||||
key.JSONPlan = matchingKey.JSONPlan
|
||||
|
||||
key.Evolutions = matchingKey.Evolutions
|
||||
|
||||
return actions
|
||||
} else {
|
||||
// multiple matching keys, set materialized only if all the keys are materialized
|
||||
|
||||
@@ -19,6 +19,8 @@ import (
|
||||
|
||||
func CollisionHandledFinalExpr(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
field *telemetrytypes.TelemetryFieldKey,
|
||||
fm qbtypes.FieldMapper,
|
||||
cb qbtypes.ConditionBuilder,
|
||||
@@ -44,7 +46,7 @@ func CollisionHandledFinalExpr(
|
||||
|
||||
addCondition := func(key *telemetrytypes.TelemetryFieldKey) error {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
condition, err := cb.ConditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb, 0, 0)
|
||||
condition, err := cb.ConditionFor(ctx, startNs, endNs, key, qbtypes.FilterOperatorExists, nil, sb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -57,7 +59,7 @@ func CollisionHandledFinalExpr(
|
||||
return nil
|
||||
}
|
||||
|
||||
colName, fieldForErr := fm.FieldFor(ctx, field)
|
||||
colName, fieldForErr := fm.FieldFor(ctx, startNs, endNs, field)
|
||||
if errors.Is(fieldForErr, qbtypes.ErrColumnNotFound) {
|
||||
// the key didn't have the right context to be added to the query
|
||||
// we try to use the context we know of
|
||||
@@ -92,7 +94,7 @@ func CollisionHandledFinalExpr(
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
colName, _ = fm.FieldFor(ctx, key)
|
||||
colName, _ = fm.FieldFor(ctx, startNs, endNs, key)
|
||||
colName, _ = DataTypeCollisionHandledFieldName(key, dummyValue, colName, qbtypes.FilterOperatorUnknown)
|
||||
stmts = append(stmts, colName)
|
||||
}
|
||||
|
||||
@@ -44,12 +44,12 @@ func keyIndexFilter(key *telemetrytypes.TelemetryFieldKey) any {
|
||||
|
||||
func (b *defaultConditionBuilder) ConditionFor(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
op qbtypes.FilterOperator,
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
_ uint64,
|
||||
_ uint64,
|
||||
) (string, error) {
|
||||
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
@@ -60,15 +60,17 @@ func (b *defaultConditionBuilder) ConditionFor(
|
||||
// as we store resource values as string
|
||||
formattedValue := querybuilder.FormatValueForContains(value)
|
||||
|
||||
column, err := b.fm.ColumnFor(ctx, key)
|
||||
columns, err := b.fm.ColumnFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// resource evolution on main table doesn't affect this as we not changing the resource column in the resource fingerprint table.
|
||||
column := columns[0]
|
||||
|
||||
keyIdxFilter := sb.Like(column.Name, keyIndexFilter(key))
|
||||
valueForIndexFilter := valueForIndexFilter(op, key, value)
|
||||
|
||||
fieldName, err := b.fm.FieldFor(ctx, key)
|
||||
fieldName, err := b.fm.FieldFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -206,7 +206,7 @@ func TestConditionBuilder(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(context.Background(), tc.key, tc.op, tc.value, sb, 0, 0)
|
||||
cond, err := conditionBuilder.ConditionFor(context.Background(), 0, 0, tc.key, tc.op, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
|
||||
if tc.expectedErr != nil {
|
||||
|
||||
@@ -27,44 +27,48 @@ func NewFieldMapper() *defaultFieldMapper {
|
||||
|
||||
func (m *defaultFieldMapper) getColumn(
|
||||
_ context.Context,
|
||||
_, _ uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
) (*schema.Column, error) {
|
||||
) ([]*schema.Column, error) {
|
||||
if key.FieldContext == telemetrytypes.FieldContextResource {
|
||||
return resourceColumns["labels"], nil
|
||||
return []*schema.Column{resourceColumns["labels"]}, nil
|
||||
}
|
||||
if col, ok := resourceColumns[key.Name]; ok {
|
||||
return col, nil
|
||||
return []*schema.Column{col}, nil
|
||||
}
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
func (m *defaultFieldMapper) ColumnFor(
|
||||
ctx context.Context,
|
||||
tsStart, tsEnd uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
) (*schema.Column, error) {
|
||||
return m.getColumn(ctx, key)
|
||||
) ([]*schema.Column, error) {
|
||||
return m.getColumn(ctx, tsStart, tsEnd, key)
|
||||
}
|
||||
|
||||
func (m *defaultFieldMapper) FieldFor(
|
||||
ctx context.Context,
|
||||
tsStart, tsEnd uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
column, err := m.getColumn(ctx, key)
|
||||
columns, err := m.getColumn(ctx, tsStart, tsEnd, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if key.FieldContext == telemetrytypes.FieldContextResource {
|
||||
return fmt.Sprintf("simpleJSONExtractString(%s, '%s')", column.Name, key.Name), nil
|
||||
return fmt.Sprintf("simpleJSONExtractString(%s, '%s')", columns[0].Name, key.Name), nil
|
||||
}
|
||||
return column.Name, nil
|
||||
return columns[0].Name, nil
|
||||
}
|
||||
|
||||
func (m *defaultFieldMapper) ColumnExpressionFor(
|
||||
ctx context.Context,
|
||||
tsStart, tsEnd uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
_ map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
colName, err := m.FieldFor(ctx, key)
|
||||
colName, err := m.FieldFor(ctx, tsStart, tsEnd, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -148,7 +148,7 @@ func (b *resourceFilterStatementBuilder[T]) Build(
|
||||
|
||||
// addConditions adds both filter and time conditions to the query
|
||||
func (b *resourceFilterStatementBuilder[T]) addConditions(
|
||||
_ context.Context,
|
||||
ctx context.Context,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[T],
|
||||
@@ -160,6 +160,7 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
|
||||
|
||||
// warnings would be encountered as part of the main condition already
|
||||
filterWhereClause, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fieldMapper,
|
||||
ConditionBuilder: b.conditionBuilder,
|
||||
@@ -171,7 +172,9 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
|
||||
// there is no need for "key" not found error for resource filtering
|
||||
IgnoreNotFoundKeys: true,
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
StartNs: start,
|
||||
EndNs: end,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -23,6 +23,7 @@ const stringMatchingOperatorDocURL = "https://signoz.io/docs/userguide/operators
|
||||
// filterExpressionVisitor implements the FilterQueryVisitor interface
|
||||
// to convert the parsed filter expressions into ClickHouse WHERE clause
|
||||
type filterExpressionVisitor struct {
|
||||
context context.Context
|
||||
logger *slog.Logger
|
||||
fieldMapper qbtypes.FieldMapper
|
||||
conditionBuilder qbtypes.ConditionBuilder
|
||||
@@ -46,6 +47,7 @@ type filterExpressionVisitor struct {
|
||||
}
|
||||
|
||||
type FilterExprVisitorOpts struct {
|
||||
Context context.Context
|
||||
Logger *slog.Logger
|
||||
FieldMapper qbtypes.FieldMapper
|
||||
ConditionBuilder qbtypes.ConditionBuilder
|
||||
@@ -65,6 +67,7 @@ type FilterExprVisitorOpts struct {
|
||||
// newFilterExpressionVisitor creates a new filterExpressionVisitor
|
||||
func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVisitor {
|
||||
return &filterExpressionVisitor{
|
||||
context: opts.Context,
|
||||
logger: opts.Logger,
|
||||
fieldMapper: opts.FieldMapper,
|
||||
conditionBuilder: opts.ConditionBuilder,
|
||||
@@ -90,7 +93,7 @@ type PreparedWhereClause struct {
|
||||
}
|
||||
|
||||
// PrepareWhereClause generates a ClickHouse compatible WHERE clause from the filter query
|
||||
func PrepareWhereClause(query string, opts FilterExprVisitorOpts, startNs uint64, endNs uint64) (*PreparedWhereClause, error) {
|
||||
func PrepareWhereClause(query string, opts FilterExprVisitorOpts) (*PreparedWhereClause, error) {
|
||||
|
||||
// Setup the ANTLR parsing pipeline
|
||||
input := antlr.NewInputStream(query)
|
||||
@@ -124,8 +127,6 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts, startNs uint64
|
||||
}
|
||||
tokens.Reset()
|
||||
|
||||
opts.StartNs = startNs
|
||||
opts.EndNs = endNs
|
||||
visitor := newFilterExpressionVisitor(opts)
|
||||
|
||||
// Handle syntax errors
|
||||
@@ -317,7 +318,7 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
|
||||
// create a full text search condition on the body field
|
||||
|
||||
keyText := keyCtx.GetText()
|
||||
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(keyText), v.builder, v.startNs, v.endNs)
|
||||
cond, err := v.conditionBuilder.ConditionFor(v.context, v.startNs, v.endNs, v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(keyText), v.builder)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
|
||||
return ""
|
||||
@@ -337,7 +338,7 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
|
||||
v.errors = append(v.errors, fmt.Sprintf("unsupported value type: %s", valCtx.GetText()))
|
||||
return ""
|
||||
}
|
||||
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(text), v.builder, v.startNs, v.endNs)
|
||||
cond, err := v.conditionBuilder.ConditionFor(v.context, v.startNs, v.endNs, v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(text), v.builder)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
|
||||
return ""
|
||||
@@ -381,7 +382,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
|
||||
}
|
||||
var conds []string
|
||||
for _, key := range keys {
|
||||
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, nil, v.builder, v.startNs, v.endNs)
|
||||
condition, err := v.conditionBuilder.ConditionFor(v.context, v.startNs, v.endNs, key, op, nil, v.builder)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
@@ -453,7 +454,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
|
||||
}
|
||||
var conds []string
|
||||
for _, key := range keys {
|
||||
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, values, v.builder, v.startNs, v.endNs)
|
||||
condition, err := v.conditionBuilder.ConditionFor(v.context, v.startNs, v.endNs, key, op, values, v.builder)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
@@ -485,7 +486,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
|
||||
|
||||
var conds []string
|
||||
for _, key := range keys {
|
||||
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, []any{value1, value2}, v.builder, v.startNs, v.endNs)
|
||||
condition, err := v.conditionBuilder.ConditionFor(v.context, v.startNs, v.endNs, key, op, []any{value1, value2}, v.builder)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
@@ -570,7 +571,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
|
||||
|
||||
var conds []string
|
||||
for _, key := range keys {
|
||||
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, value, v.builder, v.startNs, v.endNs)
|
||||
condition, err := v.conditionBuilder.ConditionFor(v.context, v.startNs, v.endNs, key, op, value, v.builder)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build condition: %s", err.Error()))
|
||||
return ""
|
||||
@@ -649,7 +650,7 @@ func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) an
|
||||
v.errors = append(v.errors, "full text search is not supported")
|
||||
return ""
|
||||
}
|
||||
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(text), v.builder, v.startNs, v.endNs)
|
||||
cond, err := v.conditionBuilder.ConditionFor(v.context, v.startNs, v.endNs, v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(text), v.builder)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
|
||||
return ""
|
||||
@@ -734,13 +735,13 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
var err error
|
||||
if BodyJSONQueryEnabled {
|
||||
fieldName, err = v.fieldMapper.FieldFor(context.Background(), key)
|
||||
fieldName, err = v.fieldMapper.FieldFor(v.context, v.startNs, v.endNs, key)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to get field name for key %s: %s", key.Name, err.Error()))
|
||||
return ""
|
||||
}
|
||||
} else {
|
||||
fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value)
|
||||
fieldName, _ = v.jsonKeyToKey(v.context, key, qbtypes.FilterOperatorUnknown, value)
|
||||
}
|
||||
} else {
|
||||
// TODO(add docs for json body search)
|
||||
@@ -855,7 +856,7 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
|
||||
// 1. either user meant key ( this is already handled above in fieldKeysForName )
|
||||
// 2. or user meant `attribute.key` we look up in the map for all possible field keys with name 'attribute.key'
|
||||
|
||||
// Note:
|
||||
// Note:
|
||||
// If user only wants to search `attribute.key`, then they have to use `attribute.attribute.key`
|
||||
// If user only wants to search `key`, then they have to use `key`
|
||||
// If user wants to search both, they can use `attribute.key` and we will resolve the ambiguity
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package querybuilder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"testing"
|
||||
@@ -54,11 +55,12 @@ func TestPrepareWhereClause_EmptyVariableList(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
opts := FilterExprVisitorOpts{
|
||||
Context: context.Background(),
|
||||
FieldKeys: keys,
|
||||
Variables: tt.variables,
|
||||
}
|
||||
|
||||
_, err := PrepareWhereClause(tt.expr, opts, 0, 0)
|
||||
_, err := PrepareWhereClause(tt.expr, opts)
|
||||
|
||||
if tt.expectError {
|
||||
if err == nil {
|
||||
@@ -467,7 +469,7 @@ func TestVisitKey(t *testing.T) {
|
||||
expectedWarnings: nil,
|
||||
expectedMainWrnURL: "",
|
||||
},
|
||||
{
|
||||
{
|
||||
name: "only attribute.custom_field is selected",
|
||||
keyText: "attribute.attribute.custom_field",
|
||||
fieldKeys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
|
||||
@@ -376,6 +376,7 @@ func New(
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
telemetrymetadata.ColumnEvolutionMetadataTableName,
|
||||
)
|
||||
|
||||
global, err := factory.NewProviderFromNamedMap(
|
||||
|
||||
@@ -25,30 +25,34 @@ func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
|
||||
|
||||
func (c *conditionBuilder) conditionFor(
|
||||
ctx context.Context,
|
||||
startNs, endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
) (string, error) {
|
||||
column, err := c.fm.ColumnFor(ctx, key)
|
||||
columns, err := c.fm.ColumnFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if column.IsJSONColumn() && querybuilder.BodyJSONQueryEnabled {
|
||||
valueType, value := InferDataType(value, operator, key)
|
||||
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
|
||||
for _, column := range columns {
|
||||
if column.IsJSONColumn() && querybuilder.BodyJSONQueryEnabled {
|
||||
valueType, value := InferDataType(value, operator, key)
|
||||
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return cond, nil
|
||||
}
|
||||
return cond, nil
|
||||
}
|
||||
|
||||
if operator.IsStringSearchOperator() {
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, key)
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -174,6 +178,31 @@ func (c *conditionBuilder) conditionFor(
|
||||
}
|
||||
|
||||
var value any
|
||||
column := columns[0]
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, _, err := selectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs, key.Name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(newColumns) == 0 {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "no valid evolution found for field %s in the given time range", key.Name)
|
||||
}
|
||||
|
||||
// This mean tblFieldName is with multiIf, we just need to do a null check.
|
||||
if len(newColumns) > 1 {
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(tblFieldName), nil
|
||||
} else {
|
||||
return sb.IsNull(tblFieldName), nil
|
||||
}
|
||||
}
|
||||
|
||||
// otherwise we have to find the correct exist operator based on the column type
|
||||
column = newColumns[0]
|
||||
}
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
@@ -228,6 +257,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
|
||||
}
|
||||
}
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported operator: %v", operator)
|
||||
@@ -235,14 +265,15 @@ func (c *conditionBuilder) conditionFor(
|
||||
|
||||
func (c *conditionBuilder) ConditionFor(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
_ uint64,
|
||||
_ uint64,
|
||||
) (string, error) {
|
||||
condition, err := c.conditionFor(ctx, key, operator, value, sb)
|
||||
|
||||
condition, err := c.conditionFor(ctx, startNs, endNs, key, operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -250,12 +281,12 @@ func (c *conditionBuilder) ConditionFor(
|
||||
if !(key.FieldContext == telemetrytypes.FieldContextBody && querybuilder.BodyJSONQueryEnabled) && operator.AddDefaultExistsFilter() {
|
||||
// skip adding exists filter for intrinsic fields
|
||||
// with an exception for body json search
|
||||
field, _ := c.fm.FieldFor(ctx, key)
|
||||
field, _ := c.fm.FieldFor(ctx, startNs, endNs, key)
|
||||
if slices.Contains(maps.Keys(IntrinsicFields), field) && key.FieldContext != telemetrytypes.FieldContextBody {
|
||||
return condition, nil
|
||||
}
|
||||
|
||||
existsCondition, err := c.conditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb)
|
||||
existsCondition, err := c.conditionFor(ctx, startNs, endNs, key, qbtypes.FilterOperatorExists, nil, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetrylogs
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -11,14 +12,148 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExistsConditionForWithEvolutions(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
startTs uint64
|
||||
endTs uint64
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
operator qbtypes.FilterOperator
|
||||
value any
|
||||
expectedSQL string
|
||||
expectedArgs []any
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "New column",
|
||||
startTs: uint64(time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC).UnixNano()),
|
||||
endTs: uint64(time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC).UnixNano()),
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE resource.`service.name`::String IS NOT NULL",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Old column",
|
||||
startTs: uint64(time.Date(2023, 1, 15, 10, 0, 0, 0, time.UTC).UnixNano()),
|
||||
endTs: uint64(time.Date(2023, 1, 15, 10, 0, 0, 0, time.UTC).UnixNano()),
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE mapContains(resources_string, 'service.name') = ?",
|
||||
expectedArgs: []any{true},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Both Old column and new - empty filter",
|
||||
startTs: uint64(time.Date(2023, 1, 15, 10, 0, 0, 0, time.UTC).UnixNano()),
|
||||
endTs: uint64(time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC).UnixNano()),
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
fm := NewFieldMapper()
|
||||
conditionBuilder := NewConditionBuilder(fm)
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, tc.startTs, tc.endTs, &tc.key, tc.operator, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
assert.Contains(t, sql, tc.expectedSQL)
|
||||
assert.Equal(t, tc.expectedArgs, args)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestConditionFor(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
mockEvolution := mockEvolutionData(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC))
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
operator qbtypes.FilterOperator
|
||||
value any
|
||||
evolutions []*telemetrytypes.EvolutionEntry
|
||||
expectedSQL string
|
||||
expectedArgs []any
|
||||
expectedError error
|
||||
@@ -240,9 +375,11 @@ func TestConditionFor(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
evolutions: mockEvolution,
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
|
||||
expectedSQL: "mapContains(resources_string, 'service.name') = ?",
|
||||
expectedArgs: []any{true},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -252,9 +389,11 @@ func TestConditionFor(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
evolutions: mockEvolution,
|
||||
operator: qbtypes.FilterOperatorNotExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NULL",
|
||||
expectedSQL: "mapContains(resources_string, 'service.name') <> ?",
|
||||
expectedArgs: []any{true},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -315,10 +454,11 @@ func TestConditionFor(t *testing.T) {
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
},
|
||||
evolutions: mockEvolution,
|
||||
operator: qbtypes.FilterOperatorRegexp,
|
||||
value: "frontend-.*",
|
||||
expectedSQL: "(match(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL), ?) AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL) IS NOT NULL)",
|
||||
expectedArgs: []any{"frontend-.*"},
|
||||
expectedSQL: "WHERE (match(`resource_string_service$$name`, ?) AND `resource_string_service$$name_exists` = ?)",
|
||||
expectedArgs: []any{"frontend-.*", true},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -329,9 +469,10 @@ func TestConditionFor(t *testing.T) {
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
},
|
||||
evolutions: mockEvolution,
|
||||
operator: qbtypes.FilterOperatorNotRegexp,
|
||||
value: "test-.*",
|
||||
expectedSQL: "WHERE NOT match(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL), ?)",
|
||||
expectedSQL: "WHERE NOT match(`resource_string_service$$name`, ?)",
|
||||
expectedArgs: []any{"test-.*"},
|
||||
expectedError: nil,
|
||||
},
|
||||
@@ -371,14 +512,13 @@ func TestConditionFor(t *testing.T) {
|
||||
expectedError: qbtypes.ErrColumnNotFound,
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
conditionBuilder := NewConditionBuilder(fm)
|
||||
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, &tc.key, tc.operator, tc.value, sb, 0, 0)
|
||||
tc.key.Evolutions = tc.evolutions
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, 0, 0, &tc.key, tc.operator, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
@@ -433,7 +573,7 @@ func TestConditionForMultipleKeys(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var err error
|
||||
for _, key := range tc.keys {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, &key, tc.operator, tc.value, sb, 0, 0)
|
||||
cond, err := conditionBuilder.conditionFor(ctx, 0, 0, &key, tc.operator, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting condition for key %s: %v", key.Name, err)
|
||||
@@ -690,7 +830,7 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, &tc.key, tc.operator, tc.value, sb, 0, 0)
|
||||
cond, err := conditionBuilder.conditionFor(ctx, 0, 0, &tc.key, tc.operator, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
|
||||
@@ -3,7 +3,10 @@ package telemetrylogs
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/utils"
|
||||
@@ -68,35 +71,36 @@ func NewFieldMapper() qbtypes.FieldMapper {
|
||||
return &fieldMapper{}
|
||||
}
|
||||
|
||||
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
||||
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
return logsV2Columns["resource"], nil
|
||||
columns := []*schema.Column{logsV2Columns["resources_string"], logsV2Columns["resource"]}
|
||||
return columns, nil
|
||||
case telemetrytypes.FieldContextScope:
|
||||
switch key.Name {
|
||||
case "name", "scope.name", "scope_name":
|
||||
return logsV2Columns["scope_name"], nil
|
||||
return []*schema.Column{logsV2Columns["scope_name"]}, nil
|
||||
case "version", "scope.version", "scope_version":
|
||||
return logsV2Columns["scope_version"], nil
|
||||
return []*schema.Column{logsV2Columns["scope_version"]}, nil
|
||||
}
|
||||
return logsV2Columns["scope_string"], nil
|
||||
return []*schema.Column{logsV2Columns["scope_string"]}, nil
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
switch key.FieldDataType {
|
||||
case telemetrytypes.FieldDataTypeString:
|
||||
return logsV2Columns["attributes_string"], nil
|
||||
return []*schema.Column{logsV2Columns["attributes_string"]}, nil
|
||||
case telemetrytypes.FieldDataTypeInt64, telemetrytypes.FieldDataTypeFloat64, telemetrytypes.FieldDataTypeNumber:
|
||||
return logsV2Columns["attributes_number"], nil
|
||||
return []*schema.Column{logsV2Columns["attributes_number"]}, nil
|
||||
case telemetrytypes.FieldDataTypeBool:
|
||||
return logsV2Columns["attributes_bool"], nil
|
||||
return []*schema.Column{logsV2Columns["attributes_bool"]}, nil
|
||||
}
|
||||
case telemetrytypes.FieldContextBody:
|
||||
// Body context is for JSON body fields
|
||||
// Use body_json if feature flag is enabled
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
return logsV2Columns[LogsV2BodyJSONColumn], nil
|
||||
return []*schema.Column{logsV2Columns[LogsV2BodyJSONColumn]}, nil
|
||||
}
|
||||
// Fall back to legacy body column
|
||||
return logsV2Columns["body"], nil
|
||||
return []*schema.Column{logsV2Columns["body"]}, nil
|
||||
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
|
||||
col, ok := logsV2Columns[key.Name]
|
||||
if !ok {
|
||||
@@ -104,96 +108,237 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
|
||||
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
|
||||
// Use body_json if feature flag is enabled and we have a body condition builder
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
return logsV2Columns[LogsV2BodyJSONColumn], nil
|
||||
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
|
||||
// i.e return both the body json and body json promoted and let the evolutions decide which one to use
|
||||
// based on the query range time.
|
||||
return []*schema.Column{logsV2Columns[LogsV2BodyJSONColumn]}, nil
|
||||
}
|
||||
// Fall back to legacy body column
|
||||
return logsV2Columns["body"], nil
|
||||
return []*schema.Column{logsV2Columns["body"]}, nil
|
||||
}
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
return col, nil
|
||||
return []*schema.Column{col}, nil
|
||||
}
|
||||
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
column, err := m.getColumn(ctx, key)
|
||||
// selectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
|
||||
// Logic:
|
||||
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// - Rejects all evolutions before this latest base evolution
|
||||
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
|
||||
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
|
||||
// - Results are sorted by ReleaseTime descending (newest first)
|
||||
func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64, fieldName string) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
|
||||
|
||||
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
|
||||
copy(sortedEvolutions, evolutions)
|
||||
|
||||
// sort the evolutions by ReleaseTime ascending
|
||||
sort.Slice(sortedEvolutions, func(i, j int) bool {
|
||||
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
|
||||
})
|
||||
|
||||
tsStartTime := time.Unix(0, int64(tsStart))
|
||||
tsEndTime := time.Unix(0, int64(tsEnd))
|
||||
|
||||
// Build evolution map: column name -> evolution
|
||||
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
|
||||
// since if there is duplicate we would just use the oldest one.
|
||||
continue
|
||||
}
|
||||
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
|
||||
}
|
||||
|
||||
// Find the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// Evolutions are sorted, so we can break early
|
||||
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if evolution.ReleaseTime.After(tsStartTime) {
|
||||
break
|
||||
}
|
||||
latestBaseEvolutionAcrossAll = evolution
|
||||
}
|
||||
|
||||
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
|
||||
if latestBaseEvolutionAcrossAll == nil {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
|
||||
}
|
||||
|
||||
columnLookUpMap := make(map[string]*schema.Column)
|
||||
for _, column := range columns {
|
||||
columnLookUpMap[column.Name] = column
|
||||
}
|
||||
|
||||
// Collect column-evolution pairs
|
||||
type colEvoPair struct {
|
||||
column *schema.Column
|
||||
evolution *telemetrytypes.EvolutionEntry
|
||||
}
|
||||
pairs := []colEvoPair{}
|
||||
|
||||
for _, evolution := range evolutionMap {
|
||||
// Reject evolutions before the latest base evolution
|
||||
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
|
||||
continue
|
||||
}
|
||||
// skip evolutions after tsEndTime
|
||||
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
|
||||
}
|
||||
|
||||
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
|
||||
}
|
||||
|
||||
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
|
||||
if len(pairs) == 0 {
|
||||
for _, column := range columns {
|
||||
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
|
||||
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
|
||||
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
for i := 0; i < len(pairs)-1; i++ {
|
||||
for j := i + 1; j < len(pairs); j++ {
|
||||
if pairs[i].evolution.ReleaseTime.Before(pairs[j].evolution.ReleaseTime) {
|
||||
pairs[i], pairs[j] = pairs[j], pairs[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract results
|
||||
newColumns := make([]*schema.Column, len(pairs))
|
||||
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
|
||||
for i, pair := range pairs {
|
||||
newColumns[i] = pair.column
|
||||
evolutionsEntries[i] = pair.evolution
|
||||
}
|
||||
|
||||
return newColumns, evolutionsEntries, nil
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
columns, err := m.getColumn(ctx, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
oldColumn := logsV2Columns["resources_string"]
|
||||
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
|
||||
|
||||
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
|
||||
// once clickHouse dependency is updated, we need to check if we can remove it.
|
||||
if key.Materialized {
|
||||
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
|
||||
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
|
||||
}
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
||||
case telemetrytypes.FieldContextBody:
|
||||
if key.JSONDataType == nil {
|
||||
return "", qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
if key.KeyNameContainsArray() && !key.JSONDataType.IsArray {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "FieldFor not supported for nested fields; only supported for flat paths (e.g. body.status.detail) and paths of Array type: %s(%s)", key.Name, key.FieldDataType)
|
||||
}
|
||||
|
||||
return m.buildFieldForJSON(key)
|
||||
default:
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
var newColumns []*schema.Column
|
||||
var evolutionsEntries []*telemetrytypes.EvolutionEntry
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, evolutionsEntries, err = selectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd, key.Name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
} else {
|
||||
newColumns = columns
|
||||
}
|
||||
|
||||
exprs := []string{}
|
||||
existExpr := []string{}
|
||||
for i, column := range newColumns {
|
||||
// Use evolution column name if available, otherwise use the column name
|
||||
columnName := column.Name
|
||||
if evolutionsEntries != nil && evolutionsEntries[i] != nil {
|
||||
columnName = evolutionsEntries[i].ColumnName
|
||||
}
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
|
||||
case telemetrytypes.FieldContextBody:
|
||||
if key.JSONDataType == nil {
|
||||
return "", qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
if key.KeyNameContainsArray() && !key.JSONDataType.IsArray {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "FieldFor not supported for nested fields; only supported for flat paths (e.g. body.status.detail) and paths of Array type: %s(%s)", key.Name, key.FieldDataType)
|
||||
}
|
||||
expr, err := m.buildFieldForJSON(key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
exprs = append(exprs, expr)
|
||||
default:
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
exprs = append(exprs, column.Name)
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
return column.Name, nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
return column.Name, nil
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
}
|
||||
exprs = append(exprs, fmt.Sprintf("%s['%s']", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("mapContains(%s, '%s')", columnName, key.Name))
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
}
|
||||
|
||||
if len(exprs) == 1 {
|
||||
return exprs[0], nil
|
||||
} else if len(exprs) > 1 {
|
||||
// Ensure existExpr has the same length as exprs
|
||||
if len(existExpr) != len(exprs) {
|
||||
return "", errors.New(errors.TypeInternal, errors.CodeInternal, "length of exist exprs doesn't match to that of exprs")
|
||||
}
|
||||
finalExprs := []string{}
|
||||
for i, expr := range exprs {
|
||||
finalExprs = append(finalExprs, fmt.Sprintf("%s, %s", existExpr[i], expr))
|
||||
}
|
||||
return "multiIf(" + strings.Join(finalExprs, ", ") + ", NULL)", nil
|
||||
}
|
||||
|
||||
// should not reach here
|
||||
return column.Name, nil
|
||||
return columns[0].Name, nil
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
return m.getColumn(ctx, key)
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnExpressionFor(
|
||||
ctx context.Context,
|
||||
tsStart, tsEnd uint64,
|
||||
field *telemetrytypes.TelemetryFieldKey,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
|
||||
colName, err := m.FieldFor(ctx, field)
|
||||
colName, err := m.FieldFor(ctx, tsStart, tsEnd, field)
|
||||
if errors.Is(err, qbtypes.ErrColumnNotFound) {
|
||||
// the key didn't have the right context to be added to the query
|
||||
// we try to use the context we know of
|
||||
@@ -203,7 +348,7 @@ func (m *fieldMapper) ColumnExpressionFor(
|
||||
if _, ok := logsV2Columns[field.Name]; ok {
|
||||
// if it is, attach the column name directly
|
||||
field.FieldContext = telemetrytypes.FieldContextLog
|
||||
colName, _ = m.FieldFor(ctx, field)
|
||||
colName, _ = m.FieldFor(ctx, tsStart, tsEnd, field)
|
||||
} else {
|
||||
// - the context is not provided
|
||||
// - there are not keys for the field
|
||||
@@ -221,12 +366,12 @@ func (m *fieldMapper) ColumnExpressionFor(
|
||||
}
|
||||
} else if len(keysForField) == 1 {
|
||||
// we have a single key for the field, use it
|
||||
colName, _ = m.FieldFor(ctx, keysForField[0])
|
||||
colName, _ = m.FieldFor(ctx, tsStart, tsEnd, keysForField[0])
|
||||
} else {
|
||||
// select any non-empty value from the keys
|
||||
args := []string{}
|
||||
for _, key := range keysForField {
|
||||
colName, _ = m.FieldFor(ctx, key)
|
||||
colName, _ = m.FieldFor(ctx, tsStart, tsEnd, key)
|
||||
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
|
||||
}
|
||||
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetrylogs
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
@@ -17,7 +18,7 @@ func TestGetColumn(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
expectedCol *schema.Column
|
||||
expectedCol []*schema.Column
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
@@ -26,7 +27,7 @@ func TestGetColumn(t *testing.T) {
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
},
|
||||
expectedCol: logsV2Columns["resource"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["resources_string"], logsV2Columns["resource"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -35,7 +36,7 @@ func TestGetColumn(t *testing.T) {
|
||||
Name: "name",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
expectedCol: logsV2Columns["scope_name"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["scope_name"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -44,7 +45,7 @@ func TestGetColumn(t *testing.T) {
|
||||
Name: "scope.name",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
expectedCol: logsV2Columns["scope_name"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["scope_name"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -53,7 +54,7 @@ func TestGetColumn(t *testing.T) {
|
||||
Name: "scope_name",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
expectedCol: logsV2Columns["scope_name"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["scope_name"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -62,7 +63,7 @@ func TestGetColumn(t *testing.T) {
|
||||
Name: "version",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
expectedCol: logsV2Columns["scope_version"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["scope_version"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -71,7 +72,7 @@ func TestGetColumn(t *testing.T) {
|
||||
Name: "custom.scope.field",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
expectedCol: logsV2Columns["scope_string"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["scope_string"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -81,7 +82,7 @@ func TestGetColumn(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
expectedCol: logsV2Columns["attributes_string"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["attributes_string"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -91,7 +92,7 @@ func TestGetColumn(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
expectedCol: logsV2Columns["attributes_number"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["attributes_number"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -101,7 +102,7 @@ func TestGetColumn(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeInt64,
|
||||
},
|
||||
expectedCol: logsV2Columns["attributes_number"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["attributes_number"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -111,7 +112,7 @@ func TestGetColumn(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeFloat64,
|
||||
},
|
||||
expectedCol: logsV2Columns["attributes_number"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["attributes_number"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -121,7 +122,7 @@ func TestGetColumn(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
expectedCol: logsV2Columns["attributes_bool"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["attributes_bool"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -130,7 +131,7 @@ func TestGetColumn(t *testing.T) {
|
||||
Name: "timestamp",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
},
|
||||
expectedCol: logsV2Columns["timestamp"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["timestamp"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -139,7 +140,7 @@ func TestGetColumn(t *testing.T) {
|
||||
Name: "body",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
},
|
||||
expectedCol: logsV2Columns["body"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["body"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -159,7 +160,7 @@ func TestGetColumn(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
expectedCol: logsV2Columns["attributes_bool"],
|
||||
expectedCol: []*schema.Column{logsV2Columns["attributes_bool"]},
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
@@ -168,7 +169,7 @@ func TestGetColumn(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
col, err := fm.ColumnFor(ctx, &tc.key)
|
||||
col, err := fm.ColumnFor(ctx, 0, 0, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
@@ -183,11 +184,14 @@ func TestGetColumn(t *testing.T) {
|
||||
func TestGetFieldKeyName(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
resourceEvolution := mockEvolutionData(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC))
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
expectedResult string
|
||||
expectedError error
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
expectedResult string
|
||||
expectedError error
|
||||
addExistsFilter bool
|
||||
}{
|
||||
{
|
||||
name: "Simple column type - timestamp",
|
||||
@@ -195,8 +199,9 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
Name: "timestamp",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
},
|
||||
expectedResult: "timestamp",
|
||||
expectedError: nil,
|
||||
expectedResult: "timestamp",
|
||||
expectedError: nil,
|
||||
addExistsFilter: false,
|
||||
},
|
||||
{
|
||||
name: "Map column type - string attribute",
|
||||
@@ -205,8 +210,9 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
expectedResult: "attributes_string['user.id']",
|
||||
expectedError: nil,
|
||||
expectedResult: "attributes_string['user.id']",
|
||||
expectedError: nil,
|
||||
addExistsFilter: false,
|
||||
},
|
||||
{
|
||||
name: "Map column type - number attribute",
|
||||
@@ -215,8 +221,9 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
expectedResult: "attributes_number['request.size']",
|
||||
expectedError: nil,
|
||||
expectedResult: "attributes_number['request.size']",
|
||||
expectedError: nil,
|
||||
addExistsFilter: false,
|
||||
},
|
||||
{
|
||||
name: "Map column type - bool attribute",
|
||||
@@ -225,28 +232,33 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
expectedResult: "attributes_bool['request.success']",
|
||||
expectedError: nil,
|
||||
expectedResult: "attributes_bool['request.success']",
|
||||
expectedError: nil,
|
||||
addExistsFilter: false,
|
||||
},
|
||||
{
|
||||
name: "Map column type - resource attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
Evolutions: resourceEvolution,
|
||||
},
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedError: nil,
|
||||
expectedResult: "resources_string['service.name']",
|
||||
expectedError: nil,
|
||||
addExistsFilter: false,
|
||||
},
|
||||
{
|
||||
name: "Map column type - resource attribute - Materialized",
|
||||
name: "Map column type - resource attribute - Materialized - json",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
Evolutions: resourceEvolution,
|
||||
},
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL)",
|
||||
expectedError: nil,
|
||||
expectedResult: "`resource_string_service$$name`",
|
||||
expectedError: nil,
|
||||
addExistsFilter: false,
|
||||
},
|
||||
{
|
||||
name: "Non-existent column",
|
||||
@@ -262,7 +274,7 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
result, err := fm.FieldFor(ctx, &tc.key)
|
||||
result, err := fm.FieldFor(ctx, 0, 0, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
@@ -273,3 +285,581 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldForWithEvolutions(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
key := &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
evolutions []*telemetrytypes.EvolutionEntry
|
||||
key *telemetrytypes.TelemetryFieldKey
|
||||
tsStartTime time.Time
|
||||
tsEndTime time.Time
|
||||
expectedResult string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "Single evolution before tsStartTime",
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
key: key,
|
||||
tsStartTime: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC),
|
||||
tsEndTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
expectedResult: "resources_string['service.name']",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Single evolution exactly at tsStartTime",
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
key: key,
|
||||
tsStartTime: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC),
|
||||
tsEndTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
expectedResult: "resources_string['service.name']",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Single evolution after tsStartTime",
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
key: key,
|
||||
tsStartTime: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC),
|
||||
tsEndTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
// TODO(piyush): to be added once integration with JSON is done.
|
||||
// {
|
||||
// name: "Single evolution after tsStartTime - JSON body",
|
||||
// evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
// {
|
||||
// Signal: telemetrytypes.SignalLogs,
|
||||
// ColumnName: LogsV2BodyJSONColumn,
|
||||
// ColumnType: "JSON(max_dynamic_paths=0)",
|
||||
// FieldContext: telemetrytypes.FieldContextBody,
|
||||
// FieldName: "__all__",
|
||||
// ReleaseTime: time.Unix(0, 0),
|
||||
// },
|
||||
// {
|
||||
// Signal: telemetrytypes.SignalLogs,
|
||||
// ColumnName: LogsV2BodyPromotedColumn,
|
||||
// ColumnType: "JSON()",
|
||||
// FieldContext: telemetrytypes.FieldContextBody,
|
||||
// FieldName: "user.name",
|
||||
// ReleaseTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
|
||||
// },
|
||||
// },
|
||||
// key: &telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "user.name",
|
||||
// FieldContext: telemetrytypes.FieldContextBody,
|
||||
// JSONDataType: &telemetrytypes.String,
|
||||
// Materialized: true,
|
||||
// },
|
||||
// tsStartTime: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC),
|
||||
// tsEndTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
// expectedResult: "coalesce(dynamicElement(body_json.`user.name`, 'String'), dynamicElement(body_promoted.`user.name`, 'String'))",
|
||||
// expectedError: nil,
|
||||
// },
|
||||
{
|
||||
name: "Multiple evolutions before tsStartTime - only latest should be included",
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 1, 2, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
key: key,
|
||||
tsStartTime: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC),
|
||||
tsEndTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
expectedResult: "resource.`service.name`::String",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Multiple evolutions after tsStartTime - all should be included",
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 1, 2, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
key: key,
|
||||
tsStartTime: time.Unix(0, 0),
|
||||
tsEndTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Duplicate evolutions after tsStartTime - all should be included",
|
||||
// Note: on production when this happens, we should go ahead and clean it up if required
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 2, 3, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
key: key,
|
||||
tsStartTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
|
||||
tsEndTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
expectedResult: "resource.`service.name`::String",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Evolution exactly at tsEndTime - should not be included",
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
key: key,
|
||||
tsStartTime: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC),
|
||||
tsEndTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC),
|
||||
expectedResult: "resources_string['service.name']",
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
|
||||
tsStart := uint64(tc.tsStartTime.UnixNano())
|
||||
tsEnd := uint64(tc.tsEndTime.UnixNano())
|
||||
tc.key.Evolutions = tc.evolutions
|
||||
|
||||
result, err := fm.FieldFor(ctx, tsStart, tsEnd, tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelectEvolutionsForColumns(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
columns []*schema.Column
|
||||
evolutions []*telemetrytypes.EvolutionEntry
|
||||
tsStart uint64
|
||||
tsEnd uint64
|
||||
fieldName string
|
||||
expectedColumns []string // column names
|
||||
expectedEvols []string // evolution column names
|
||||
expectedError bool
|
||||
errorStr string
|
||||
}{
|
||||
{
|
||||
name: "New evolutions after tsStartTime - should include all",
|
||||
columns: []*schema.Column{
|
||||
logsV2Columns["resources_string"],
|
||||
logsV2Columns["resource"],
|
||||
},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 0,
|
||||
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 1,
|
||||
ReleaseTime: time.Date(2024, 2, 3, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
fieldName: "service.name",
|
||||
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{"resource", "resources_string"}, // sorted by ReleaseTime desc
|
||||
expectedEvols: []string{"resource", "resources_string"},
|
||||
},
|
||||
{
|
||||
name: "Columns without matching evolutions - should exclude them",
|
||||
columns: []*schema.Column{
|
||||
logsV2Columns["resources_string"],
|
||||
logsV2Columns["resource"], // no evolution for this
|
||||
logsV2Columns["attributes_string"], // no evolution for this
|
||||
},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 0,
|
||||
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
fieldName: "service.name",
|
||||
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{"resources_string"},
|
||||
expectedEvols: []string{"resources_string"},
|
||||
},
|
||||
{
|
||||
name: "New evolutions after tsEndTime - should exclude all",
|
||||
columns: []*schema.Column{
|
||||
logsV2Columns["resources_string"],
|
||||
logsV2Columns["resource"],
|
||||
},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 0,
|
||||
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 1,
|
||||
ReleaseTime: time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
fieldName: "service.name",
|
||||
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{"resources_string"},
|
||||
expectedEvols: []string{"resources_string"},
|
||||
},
|
||||
{
|
||||
name: "Empty columns array",
|
||||
columns: []*schema.Column{},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 0,
|
||||
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
fieldName: "service.name",
|
||||
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{},
|
||||
expectedEvols: []string{},
|
||||
expectedError: true,
|
||||
errorStr: "column resources_string not found",
|
||||
},
|
||||
{
|
||||
name: "Duplicate evolutions - should use first encountered (oldest if sorted)",
|
||||
columns: []*schema.Column{
|
||||
logsV2Columns["resource"],
|
||||
},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 0,
|
||||
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 1,
|
||||
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 1,
|
||||
ReleaseTime: time.Date(2024, 1, 20, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
fieldName: "service.name",
|
||||
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{"resource"},
|
||||
expectedEvols: []string{"resource"}, // should use first one (older)
|
||||
},
|
||||
{
|
||||
name: "Genuine Duplicate evolutions with new version- should consider both",
|
||||
columns: []*schema.Column{
|
||||
logsV2Columns["resources_string"],
|
||||
logsV2Columns["resource"],
|
||||
},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 0,
|
||||
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 1,
|
||||
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
Version: 2,
|
||||
ReleaseTime: time.Date(2024, 1, 20, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
fieldName: "service.name",
|
||||
tsStart: uint64(time.Date(2024, 1, 16, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{"resources_string", "resource"},
|
||||
expectedEvols: []string{"resources_string", "resource"}, // should use first one (older)
|
||||
},
|
||||
{
|
||||
name: "Evolution exactly at tsEndTime",
|
||||
columns: []*schema.Column{
|
||||
logsV2Columns["resources_string"],
|
||||
logsV2Columns["resource"],
|
||||
},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC), // exactly at tsEnd
|
||||
},
|
||||
},
|
||||
fieldName: "service.name",
|
||||
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{"resources_string"}, // resource excluded because After(tsEnd) is true
|
||||
expectedEvols: []string{"resources_string"},
|
||||
},
|
||||
{
|
||||
name: "Single evolution after tsStartTime - JSON body",
|
||||
columns: []*schema.Column{
|
||||
logsV2Columns[LogsV2BodyJSONColumn],
|
||||
logsV2Columns[LogsV2BodyPromotedColumn],
|
||||
},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: LogsV2BodyJSONColumn,
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: LogsV2BodyPromotedColumn,
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldName: "user.name",
|
||||
ReleaseTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
fieldName: "user.name",
|
||||
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{LogsV2BodyPromotedColumn, LogsV2BodyJSONColumn}, // sorted by ReleaseTime desc (newest first)
|
||||
expectedEvols: []string{LogsV2BodyPromotedColumn, LogsV2BodyJSONColumn},
|
||||
},
|
||||
{
|
||||
name: "No evolution after tsStartTime - JSON body",
|
||||
columns: []*schema.Column{
|
||||
logsV2Columns[LogsV2BodyJSONColumn],
|
||||
logsV2Columns[LogsV2BodyPromotedColumn],
|
||||
},
|
||||
evolutions: []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: LogsV2BodyJSONColumn,
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: LogsV2BodyPromotedColumn,
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldName: "user.name",
|
||||
ReleaseTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
fieldName: "user.name",
|
||||
tsStart: uint64(time.Date(2024, 2, 3, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedColumns: []string{LogsV2BodyPromotedColumn},
|
||||
expectedEvols: []string{LogsV2BodyPromotedColumn},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
resultColumns, resultEvols, err := selectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd, tc.fieldName)
|
||||
|
||||
if tc.expectedError {
|
||||
assert.Contains(t, err.Error(), tc.errorStr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, len(tc.expectedColumns), len(resultColumns), "column count mismatch")
|
||||
assert.Equal(t, len(tc.expectedEvols), len(resultEvols), "evolution count mismatch")
|
||||
|
||||
resultColumnNames := make([]string, len(resultColumns))
|
||||
for i, col := range resultColumns {
|
||||
resultColumnNames[i] = col.Name
|
||||
}
|
||||
resultEvolNames := make([]string, len(resultEvols))
|
||||
for i, evol := range resultEvols {
|
||||
resultEvolNames[i] = evol.ColumnName
|
||||
}
|
||||
|
||||
for i := range tc.expectedColumns {
|
||||
assert.Equal(t, resultColumnNames[i], tc.expectedColumns[i], "expected column missing: "+tc.expectedColumns[i])
|
||||
}
|
||||
for i := range tc.expectedEvols {
|
||||
assert.Equal(t, resultEvolNames[i], tc.expectedEvols[i], "expected evolution missing: "+tc.expectedEvols[i])
|
||||
}
|
||||
// Verify sorting: should be descending by ReleaseTime
|
||||
for i := 0; i < len(resultEvols)-1; i++ {
|
||||
assert.True(t, !resultEvols[i].ReleaseTime.Before(resultEvols[i+1].ReleaseTime),
|
||||
"evolutions should be sorted descending by ReleaseTime")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package telemetrylogs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
@@ -10,12 +11,14 @@ import (
|
||||
|
||||
// TestLikeAndILikeWithoutWildcards_Warns Tests that LIKE/ILIKE without wildcards add warnings and include docs URL
|
||||
func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
keys := buildCompleteFieldKeyMap()
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
@@ -33,7 +36,7 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
|
||||
|
||||
for _, expr := range tests {
|
||||
t.Run(expr, func(t *testing.T) {
|
||||
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(expr, opts)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, clause)
|
||||
|
||||
@@ -52,6 +55,7 @@ func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
|
||||
keys := buildCompleteFieldKeyMap()
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Context: context.Background(),
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
@@ -69,7 +73,7 @@ func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
|
||||
|
||||
for _, expr := range tests {
|
||||
t.Run(expr, func(t *testing.T) {
|
||||
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(expr, opts)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, clause)
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package telemetrylogs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
@@ -19,6 +20,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
|
||||
keys := buildCompleteFieldKeyMap()
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Context: context.Background(),
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
@@ -161,7 +163,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.category, limitString(tc.query, 50)), func(t *testing.T) {
|
||||
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts)
|
||||
|
||||
if tc.shouldPass {
|
||||
if err != nil {
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package telemetrylogs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
@@ -15,19 +17,33 @@ import (
|
||||
|
||||
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search
|
||||
func TestFilterExprLogs(t *testing.T) {
|
||||
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
ctx := context.Background()
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
// Define a comprehensive set of field keys to support all test cases
|
||||
keys := buildCompleteFieldKeyMap()
|
||||
|
||||
// for each key of resource attribute add evolution metadata
|
||||
for i, telemetryKeys := range keys {
|
||||
for j, telemetryKey := range telemetryKeys {
|
||||
if telemetryKey.FieldContext == telemetrytypes.FieldContextResource {
|
||||
keys[i][j].Evolutions = mockEvolutionData(releaseTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
StartNs: uint64(releaseTime.Add(-5 * time.Minute).UnixNano()),
|
||||
EndNs: uint64(releaseTime.Add(5 * time.Minute).UnixNano()),
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
@@ -466,7 +482,7 @@ func TestFilterExprLogs(t *testing.T) {
|
||||
expectedErrorContains: "",
|
||||
},
|
||||
|
||||
// fulltext with parenthesized expression
|
||||
//fulltext with parenthesized expression
|
||||
{
|
||||
category: "FREETEXT with parentheses",
|
||||
query: "error (status.code=500 OR status.code=503)",
|
||||
@@ -2386,7 +2402,7 @@ func TestFilterExprLogs(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.category, limitString(tc.query, 50)), func(t *testing.T) {
|
||||
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts)
|
||||
|
||||
if tc.shouldPass {
|
||||
if err != nil {
|
||||
@@ -2442,6 +2458,7 @@ func TestFilterExprLogsConflictNegation(t *testing.T) {
|
||||
}
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Context: context.Background(),
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
@@ -2504,7 +2521,7 @@ func TestFilterExprLogsConflictNegation(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.category, limitString(tc.query, 50)), func(t *testing.T) {
|
||||
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts)
|
||||
|
||||
if tc.shouldPass {
|
||||
if err != nil {
|
||||
|
||||
@@ -268,7 +268,7 @@ func (b *logQueryStatementBuilder) buildListQuery(
|
||||
}
|
||||
|
||||
// get column expression for the field - use array index directly to avoid pointer to loop variable
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, &query.SelectFields[index], keys)
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &query.SelectFields[index], keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -277,7 +277,6 @@ func (b *logQueryStatementBuilder) buildListQuery(
|
||||
}
|
||||
|
||||
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
||||
|
||||
// Add filter conditions
|
||||
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
||||
|
||||
@@ -287,7 +286,8 @@ func (b *logQueryStatementBuilder) buildListQuery(
|
||||
|
||||
// Add order by
|
||||
for _, orderBy := range query.Order {
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
|
||||
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &orderBy.Key.TelemetryFieldKey, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -353,7 +353,7 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
|
||||
// Keep original column expressions so we can build the tuple
|
||||
fieldNames := make([]string, 0, len(query.GroupBy))
|
||||
for _, gb := range query.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -368,7 +368,7 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
|
||||
allAggChArgs := make([]any, 0)
|
||||
for i, agg := range query.Aggregations {
|
||||
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
||||
ctx, agg.Expression,
|
||||
ctx, start, end, agg.Expression,
|
||||
uint64(query.StepInterval.Seconds()),
|
||||
keys,
|
||||
)
|
||||
@@ -500,7 +500,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
|
||||
var allGroupByArgs []any
|
||||
|
||||
for _, gb := range query.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -518,7 +518,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
|
||||
for idx := range query.Aggregations {
|
||||
aggExpr := query.Aggregations[idx]
|
||||
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
||||
ctx, aggExpr.Expression,
|
||||
ctx, start, end, aggExpr.Expression,
|
||||
rateInterval,
|
||||
keys,
|
||||
)
|
||||
@@ -590,7 +590,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
|
||||
|
||||
// buildFilterCondition builds SQL condition from filter expression
|
||||
func (b *logQueryStatementBuilder) addFilterCondition(
|
||||
_ context.Context,
|
||||
ctx context.Context,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
||||
@@ -604,6 +604,7 @@ func (b *logQueryStatementBuilder) addFilterCondition(
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
// add filter expression
|
||||
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
@@ -612,7 +613,9 @@ func (b *logQueryStatementBuilder) addFilterCondition(
|
||||
FullTextColumn: b.fullTextColumn,
|
||||
JsonKeyToKey: b.jsonKeyToKey,
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
StartNs: start,
|
||||
EndNs: end,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -37,7 +37,14 @@ func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation
|
||||
}
|
||||
|
||||
func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
|
||||
// Create a test release time
|
||||
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
releaseTimeNano := uint64(releaseTime.UnixNano())
|
||||
|
||||
cases := []struct {
|
||||
startTs uint64
|
||||
endTs uint64
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
|
||||
@@ -45,14 +52,16 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "Time series with limit",
|
||||
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
|
||||
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
|
||||
name: "Time series with limit and count distinct on service.name",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
Expression: "count_distinct(service.name)",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
@@ -68,20 +77,22 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
|
||||
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, countDistinct(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, countDistinct(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
|
||||
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1705397400), uint64(1705485600), "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600), 10, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "Time series with OR b/w resource attr and attribute filter",
|
||||
startTs: releaseTimeNano - uint64(24*time.Hour.Nanoseconds()),
|
||||
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
|
||||
name: "Time series with OR b/w resource attr and attribute filter and count distinct on service.name",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
Expression: "count_distinct(service.name)",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
@@ -97,12 +108,14 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE ((simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) OR true) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) = ? AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) = ? AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "redis-manual", "GET", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, "redis-manual", "GET", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE ((simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) OR true) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, countDistinct(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) = ? AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, countDistinct(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) = ? AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1705224600), uint64(1705485600), "redis-manual", "GET", true, "1705226400000000000", uint64(1705224600), "1705485600000000000", uint64(1705485600), 10, "redis-manual", "GET", true, "1705226400000000000", uint64(1705224600), "1705485600000000000", uint64(1705485600)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
|
||||
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
|
||||
name: "Time series with limit + custom order by",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
@@ -136,12 +149,14 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY `service.name` desc, ts desc",
|
||||
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY `service.name` desc, ts desc",
|
||||
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1705397400), uint64(1705485600), "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600), 10, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
|
||||
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
|
||||
name: "Time series with group by on materialized column",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
@@ -168,10 +183,12 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(`attribute_string_materialized$$key$$name_exists` = ?, `attribute_string_materialized$$key$$name`, NULL)) AS `materialized.key.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `materialized.key.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(`attribute_string_materialized$$key$$name_exists` = ?, `attribute_string_materialized$$key$$name`, NULL)) AS `materialized.key.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`materialized.key.name`) GLOBAL IN (SELECT `materialized.key.name` FROM __limit_cte) GROUP BY ts, `materialized.key.name`",
|
||||
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1705397400), uint64(1705485600), true, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600), 10, true, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
|
||||
},
|
||||
},
|
||||
{
|
||||
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
|
||||
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
|
||||
name: "Time series with materialised column using or with regex operator",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
@@ -189,14 +206,29 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (true OR true) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((match(`attribute_string_materialized$$key$$name`, ?) AND `attribute_string_materialized$$key$$name_exists` = ?) OR (`attribute_string_materialized$$key$$name` = ? AND `attribute_string_materialized$$key$$name_exists` = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "redis.*", true, "memcached", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
Args: []any{uint64(1705397400), uint64(1705485600), "redis.*", true, "memcached", true, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
keysMap := buildCompleteFieldKeyMap()
|
||||
|
||||
// for each key of resource attribute add evolution metadata
|
||||
for i, telemetryKeys := range keysMap {
|
||||
for j, telemetryKey := range telemetryKeys {
|
||||
if telemetryKey.FieldContext == telemetrytypes.FieldContextResource {
|
||||
keysMap[i][j].Signal = telemetrytypes.SignalLogs
|
||||
keysMap[i][j].Evolutions = mockEvolutionData(releaseTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mockMetadataStore.KeysMap = keysMap
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
@@ -218,7 +250,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
q, err := statementBuilder.Build(ctx, c.startTs, c.endTs, c.requestType, c.query, nil)
|
||||
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
@@ -315,9 +347,10 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fm := NewFieldMapper()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
@@ -338,7 +371,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
q, err := statementBuilder.Build(ctx, 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
@@ -423,9 +456,10 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fm := NewFieldMapper()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
@@ -443,12 +477,10 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
|
||||
GetBodyJSONKey,
|
||||
)
|
||||
|
||||
//
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
q, err := statementBuilder.Build(ctx, 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
@@ -499,9 +531,10 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fm := NewFieldMapper()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
@@ -522,7 +555,7 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
q, err := statementBuilder.Build(ctx, 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
|
||||
if c.expectedErrContains != "" {
|
||||
require.Error(t, err)
|
||||
@@ -594,9 +627,10 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
|
||||
fm := NewFieldMapper()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
@@ -617,7 +651,7 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
q, err := statementBuilder.Build(ctx, 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
|
||||
@@ -2,6 +2,7 @@ package telemetrylogs
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
@@ -1007,3 +1008,24 @@ func buildCompleteFieldKeyMapCollision() map[string][]*telemetrytypes.TelemetryF
|
||||
}
|
||||
return keysMap
|
||||
}
|
||||
|
||||
func mockEvolutionData(releaseTime time.Time) []*telemetrytypes.EvolutionEntry {
|
||||
return []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resources_string",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: releaseTime,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,12 +21,11 @@ func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
|
||||
|
||||
func (c *conditionBuilder) ConditionFor(
|
||||
ctx context.Context,
|
||||
tsStart, tsEnd uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
_ uint64,
|
||||
_ uint64,
|
||||
) (string, error) {
|
||||
|
||||
switch operator {
|
||||
@@ -39,13 +38,13 @@ func (c *conditionBuilder) ConditionFor(
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
column, err := c.fm.ColumnFor(ctx, key)
|
||||
columns, err := c.fm.ColumnFor(ctx, tsStart, tsEnd, key)
|
||||
if err != nil {
|
||||
// if we don't have a column, we can't build a condition for related values
|
||||
return "", nil
|
||||
}
|
||||
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, key)
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, tsStart, tsEnd, key)
|
||||
if err != nil {
|
||||
// if we don't have a table field name, we can't build a condition for related values
|
||||
return "", nil
|
||||
@@ -120,12 +119,12 @@ func (c *conditionBuilder) ConditionFor(
|
||||
// in the query builder, `exists` and `not exists` are used for
|
||||
// key membership checks, so depending on the column type, the condition changes
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
switch column.Type {
|
||||
switch columns[0].Type {
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", columns[0].Name, key.Name)
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
cond = sb.E(leftOperand, true)
|
||||
} else {
|
||||
@@ -134,5 +133,5 @@ func (c *conditionBuilder) ConditionFor(
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf(expr, column.Name, sb.Var(key.Name), cond), nil
|
||||
return fmt.Sprintf(expr, columns[0].Name, sb.Var(key.Name), cond), nil
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestConditionFor(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, &tc.key, tc.operator, tc.value, sb, 0, 0)
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, 0, 0, &tc.key, tc.operator, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
|
||||
@@ -33,47 +33,48 @@ func NewFieldMapper() qbtypes.FieldMapper {
|
||||
return &fieldMapper{}
|
||||
}
|
||||
|
||||
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
||||
func (m *fieldMapper) getColumn(_ context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
return attributeMetadataColumns["resource_attributes"], nil
|
||||
return []*schema.Column{attributeMetadataColumns["resource_attributes"]}, nil
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
return attributeMetadataColumns["attributes"], nil
|
||||
return []*schema.Column{attributeMetadataColumns["attributes"]}, nil
|
||||
}
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
||||
column, err := m.getColumn(ctx, key)
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
columns, err := m.getColumn(ctx, tsStart, tsEnd, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return column, nil
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
column, err := m.getColumn(ctx, key)
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, startNs, endNs uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
columns, err := m.getColumn(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
switch column.Type {
|
||||
switch columns[0].Type {
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}:
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
return fmt.Sprintf("%s['%s']", columns[0].Name, key.Name), nil
|
||||
}
|
||||
return column.Name, nil
|
||||
return columns[0].Name, nil
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnExpressionFor(
|
||||
ctx context.Context,
|
||||
startNs, endNs uint64,
|
||||
field *telemetrytypes.TelemetryFieldKey,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
|
||||
colName, err := m.FieldFor(ctx, field)
|
||||
colName, err := m.FieldFor(ctx, startNs, endNs, field)
|
||||
if errors.Is(err, qbtypes.ErrColumnNotFound) {
|
||||
// the key didn't have the right context to be added to the query
|
||||
// we try to use the context we know of
|
||||
@@ -83,7 +84,7 @@ func (m *fieldMapper) ColumnExpressionFor(
|
||||
if _, ok := attributeMetadataColumns[field.Name]; ok {
|
||||
// if it is, attach the column name directly
|
||||
field.FieldContext = telemetrytypes.FieldContextSpan
|
||||
colName, _ = m.FieldFor(ctx, field)
|
||||
colName, _ = m.FieldFor(ctx, startNs, endNs, field)
|
||||
} else {
|
||||
// - the context is not provided
|
||||
// - there are not keys for the field
|
||||
@@ -101,12 +102,12 @@ func (m *fieldMapper) ColumnExpressionFor(
|
||||
}
|
||||
} else if len(keysForField) == 1 {
|
||||
// we have a single key for the field, use it
|
||||
colName, _ = m.FieldFor(ctx, keysForField[0])
|
||||
colName, _ = m.FieldFor(ctx, startNs, endNs, keysForField[0])
|
||||
} else {
|
||||
// select any non-empty value from the keys
|
||||
args := []string{}
|
||||
for _, key := range keysForField {
|
||||
colName, _ = m.FieldFor(ctx, key)
|
||||
colName, _ = m.FieldFor(ctx, startNs, endNs, key)
|
||||
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
|
||||
}
|
||||
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
|
||||
|
||||
@@ -128,13 +128,13 @@ func TestGetColumn(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
col, err := fm.ColumnFor(context.Background(), &tc.key)
|
||||
col, err := fm.ColumnFor(context.Background(), 0, 0, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedCol, col)
|
||||
assert.Equal(t, tc.expectedCol, col[0])
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -145,6 +145,8 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tsStart uint64
|
||||
tsEnd uint64
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
expectedResult string
|
||||
expectedError error
|
||||
@@ -203,7 +205,7 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result, err := fm.FieldFor(ctx, &tc.key)
|
||||
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
|
||||
@@ -2,9 +2,11 @@ package telemetrymetadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
@@ -13,6 +15,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types/cachetypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -31,23 +34,24 @@ var (
|
||||
)
|
||||
|
||||
type telemetryMetaStore struct {
|
||||
logger *slog.Logger
|
||||
telemetrystore telemetrystore.TelemetryStore
|
||||
tracesDBName string
|
||||
tracesFieldsTblName string
|
||||
spanAttributesKeysTblName string
|
||||
indexV3TblName string
|
||||
metricsDBName string
|
||||
metricsFieldsTblName string
|
||||
meterDBName string
|
||||
meterFieldsTblName string
|
||||
logsDBName string
|
||||
logsFieldsTblName string
|
||||
logAttributeKeysTblName string
|
||||
logResourceKeysTblName string
|
||||
logsV2TblName string
|
||||
relatedMetadataDBName string
|
||||
relatedMetadataTblName string
|
||||
logger *slog.Logger
|
||||
telemetrystore telemetrystore.TelemetryStore
|
||||
tracesDBName string
|
||||
tracesFieldsTblName string
|
||||
spanAttributesKeysTblName string
|
||||
indexV3TblName string
|
||||
metricsDBName string
|
||||
metricsFieldsTblName string
|
||||
meterDBName string
|
||||
meterFieldsTblName string
|
||||
logsDBName string
|
||||
logsFieldsTblName string
|
||||
logAttributeKeysTblName string
|
||||
logResourceKeysTblName string
|
||||
logsV2TblName string
|
||||
relatedMetadataDBName string
|
||||
relatedMetadataTblName string
|
||||
columnEvolutionMetadataTblName string
|
||||
|
||||
fm qbtypes.FieldMapper
|
||||
conditionBuilder qbtypes.ConditionBuilder
|
||||
@@ -76,27 +80,29 @@ func NewTelemetryMetaStore(
|
||||
logResourceKeysTblName string,
|
||||
relatedMetadataDBName string,
|
||||
relatedMetadataTblName string,
|
||||
columnEvolutionMetadataTblName string,
|
||||
) telemetrytypes.MetadataStore {
|
||||
metadataSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata")
|
||||
|
||||
t := &telemetryMetaStore{
|
||||
logger: metadataSettings.Logger(),
|
||||
telemetrystore: telemetrystore,
|
||||
tracesDBName: tracesDBName,
|
||||
tracesFieldsTblName: tracesFieldsTblName,
|
||||
spanAttributesKeysTblName: spanAttributesKeysTblName,
|
||||
indexV3TblName: indexV3TblName,
|
||||
metricsDBName: metricsDBName,
|
||||
metricsFieldsTblName: metricsFieldsTblName,
|
||||
meterDBName: meterDBName,
|
||||
meterFieldsTblName: meterFieldsTblName,
|
||||
logsDBName: logsDBName,
|
||||
logsV2TblName: logsV2TblName,
|
||||
logsFieldsTblName: logsFieldsTblName,
|
||||
logAttributeKeysTblName: logAttributeKeysTblName,
|
||||
logResourceKeysTblName: logResourceKeysTblName,
|
||||
relatedMetadataDBName: relatedMetadataDBName,
|
||||
relatedMetadataTblName: relatedMetadataTblName,
|
||||
logger: metadataSettings.Logger(),
|
||||
telemetrystore: telemetrystore,
|
||||
tracesDBName: tracesDBName,
|
||||
tracesFieldsTblName: tracesFieldsTblName,
|
||||
spanAttributesKeysTblName: spanAttributesKeysTblName,
|
||||
indexV3TblName: indexV3TblName,
|
||||
metricsDBName: metricsDBName,
|
||||
metricsFieldsTblName: metricsFieldsTblName,
|
||||
meterDBName: meterDBName,
|
||||
meterFieldsTblName: meterFieldsTblName,
|
||||
logsDBName: logsDBName,
|
||||
logsV2TblName: logsV2TblName,
|
||||
logsFieldsTblName: logsFieldsTblName,
|
||||
logAttributeKeysTblName: logAttributeKeysTblName,
|
||||
logResourceKeysTblName: logResourceKeysTblName,
|
||||
relatedMetadataDBName: relatedMetadataDBName,
|
||||
relatedMetadataTblName: relatedMetadataTblName,
|
||||
columnEvolutionMetadataTblName: columnEvolutionMetadataTblName,
|
||||
jsonColumnMetadata: map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata{
|
||||
telemetrytypes.SignalLogs: {
|
||||
telemetrytypes.FieldContextBody: telemetrytypes.JSONColumnMetadata{
|
||||
@@ -563,9 +569,48 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
|
||||
keys = append(keys, bodyJSONPaths...)
|
||||
complete = complete && finished
|
||||
}
|
||||
|
||||
// fetch and add evolutions
|
||||
evolutionMetadataKeySelectors := getEvolutionMetadataKeySelectors(keys)
|
||||
evolutions, err := t.GetColumnEvolutionMetadataMulti(ctx, evolutionMetadataKeySelectors)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
for i, key := range keys {
|
||||
// first check if there is evolutions that with field name as __all__
|
||||
// then check for specific field name
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: key.Signal,
|
||||
FieldContext: key.FieldContext,
|
||||
FieldName: "__all__",
|
||||
}
|
||||
|
||||
if keyEvolutions, ok := evolutions[telemetrytypes.GetEvolutionMetadataUniqueKey(selector)]; ok {
|
||||
keys[i].Evolutions = keyEvolutions
|
||||
}
|
||||
|
||||
selector.FieldName = key.Name
|
||||
if keyEvolutions, ok := evolutions[telemetrytypes.GetEvolutionMetadataUniqueKey(selector)]; ok {
|
||||
keys[i].Evolutions = keyEvolutions
|
||||
}
|
||||
}
|
||||
|
||||
return keys, complete, nil
|
||||
}
|
||||
|
||||
func getEvolutionMetadataKeySelectors(keySelectors []*telemetrytypes.TelemetryFieldKey) []*telemetrytypes.EvolutionSelector {
|
||||
var metadataKeySelectors []*telemetrytypes.EvolutionSelector
|
||||
for _, keySelector := range keySelectors {
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: keySelector.Signal,
|
||||
FieldContext: keySelector.FieldContext,
|
||||
FieldName: keySelector.Name,
|
||||
}
|
||||
metadataKeySelectors = append(metadataKeySelectors, selector)
|
||||
}
|
||||
return metadataKeySelectors
|
||||
}
|
||||
|
||||
func getPriorityForContext(ctx telemetrytypes.FieldContext) int {
|
||||
switch ctx {
|
||||
case telemetrytypes.FieldContextLog:
|
||||
@@ -986,18 +1031,18 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
|
||||
FieldDataType: fieldValueSelector.FieldDataType,
|
||||
}
|
||||
|
||||
selectColumn, err := t.fm.FieldFor(ctx, key)
|
||||
selectColumn, err := t.fm.FieldFor(ctx, 0, 0, key)
|
||||
|
||||
if err != nil {
|
||||
// we don't have a explicit column to select from the related metadata table
|
||||
// so we will select either from resource_attributes or attributes table
|
||||
// in that order
|
||||
resourceColumn, _ := t.fm.FieldFor(ctx, &telemetrytypes.TelemetryFieldKey{
|
||||
resourceColumn, _ := t.fm.FieldFor(ctx, 0, 0, &telemetrytypes.TelemetryFieldKey{
|
||||
Name: key.Name,
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
})
|
||||
attributeColumn, _ := t.fm.FieldFor(ctx, &telemetrytypes.TelemetryFieldKey{
|
||||
attributeColumn, _ := t.fm.FieldFor(ctx, 0, 0, &telemetrytypes.TelemetryFieldKey{
|
||||
Name: key.Name,
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
@@ -1018,11 +1063,12 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
|
||||
}
|
||||
|
||||
whereClause, err := querybuilder.PrepareWhereClause(fieldValueSelector.ExistingQuery, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: t.logger,
|
||||
FieldMapper: t.fm,
|
||||
ConditionBuilder: t.conditionBuilder,
|
||||
FieldKeys: keys,
|
||||
}, 0, 0)
|
||||
})
|
||||
if err == nil {
|
||||
sb.AddWhereClause(whereClause.WhereClause)
|
||||
} else {
|
||||
@@ -1046,20 +1092,20 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
|
||||
|
||||
// search on attributes
|
||||
key.FieldContext = telemetrytypes.FieldContextAttribute
|
||||
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
|
||||
cond, err := t.conditionBuilder.ConditionFor(ctx, 0, 0, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb)
|
||||
if err == nil {
|
||||
conds = append(conds, cond)
|
||||
}
|
||||
|
||||
// search on resource
|
||||
key.FieldContext = telemetrytypes.FieldContextResource
|
||||
cond, err = t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
|
||||
cond, err = t.conditionBuilder.ConditionFor(ctx, 0, 0, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb)
|
||||
if err == nil {
|
||||
conds = append(conds, cond)
|
||||
}
|
||||
key.FieldContext = origContext
|
||||
} else {
|
||||
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
|
||||
cond, err := t.conditionBuilder.ConditionFor(ctx, 0, 0, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb)
|
||||
if err == nil {
|
||||
conds = append(conds, cond)
|
||||
}
|
||||
@@ -1750,6 +1796,103 @@ func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Cont
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// CachedColumnEvolutionMetadata is a cacheable type for storing column evolution metadata
|
||||
type CachedEvolutionEntry struct {
|
||||
Metadata []*telemetrytypes.EvolutionEntry `json:"metadata"`
|
||||
}
|
||||
|
||||
var _ cachetypes.Cacheable = (*CachedEvolutionEntry)(nil)
|
||||
|
||||
func (c *CachedEvolutionEntry) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
|
||||
func (c *CachedEvolutionEntry) UnmarshalBinary(data []byte) error {
|
||||
return json.Unmarshal(data, c)
|
||||
}
|
||||
|
||||
func (k *telemetryMetaStore) fetchEvolutionEntryFromClickHouse(ctx context.Context, selectors []*telemetrytypes.EvolutionSelector) ([]*telemetrytypes.EvolutionEntry, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("signal", "column_name", "column_type", "field_context", "field_name", "version", "release_time")
|
||||
sb.From(fmt.Sprintf("%s.%s", k.relatedMetadataDBName, k.columnEvolutionMetadataTblName))
|
||||
sb.OrderBy("release_time ASC")
|
||||
|
||||
var clauses []string
|
||||
for _, selector := range selectors {
|
||||
var clause string
|
||||
|
||||
if selector.FieldContext != telemetrytypes.FieldContextUnspecified {
|
||||
clause = sb.E("field_context", selector.FieldContext)
|
||||
}
|
||||
|
||||
if selector.FieldName != "" {
|
||||
clause = sb.And(clause,
|
||||
sb.Or(sb.E("field_name", selector.FieldName), sb.E("field_name", "__all__")),
|
||||
)
|
||||
} else {
|
||||
clause = sb.And(clause, sb.E("field_name", "__all__"))
|
||||
}
|
||||
|
||||
clauses = append(clauses, sb.And(sb.E("signal", selector.Signal), clause))
|
||||
}
|
||||
sb.Where(sb.Or(clauses...))
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var entries []*telemetrytypes.EvolutionEntry
|
||||
rows, err := k.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var entry telemetrytypes.EvolutionEntry
|
||||
var releaseTimeNs float64
|
||||
if err := rows.Scan(
|
||||
&entry.Signal,
|
||||
&entry.ColumnName,
|
||||
&entry.ColumnType,
|
||||
&entry.FieldContext,
|
||||
&entry.FieldName,
|
||||
&entry.Version,
|
||||
&releaseTimeNs,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Convert nanoseconds to time.Time
|
||||
releaseTime := time.Unix(0, int64(releaseTimeNs))
|
||||
entry.ReleaseTime = releaseTime
|
||||
entries = append(entries, &entry)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// Get retrieves all evolutions for the given selectors from DB.
|
||||
func (k *telemetryMetaStore) GetColumnEvolutionMetadataMulti(ctx context.Context, selectors []*telemetrytypes.EvolutionSelector) (map[string][]*telemetrytypes.EvolutionEntry, error) {
|
||||
evolutions, err := k.fetchEvolutionEntryFromClickHouse(ctx, selectors)
|
||||
if err != nil {
|
||||
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to fetch evolution from clickhouse %s", err.Error())
|
||||
}
|
||||
|
||||
evolutionsByUniqueKey := make(map[string][]*telemetrytypes.EvolutionEntry)
|
||||
|
||||
for _, evolution := range evolutions {
|
||||
key := telemetrytypes.GetEvolutionMetadataUniqueKey(&telemetrytypes.EvolutionSelector{
|
||||
Signal: evolution.Signal,
|
||||
FieldContext: evolution.FieldContext,
|
||||
FieldName: evolution.FieldName,
|
||||
})
|
||||
evolutionsByUniqueKey[key] = append(evolutionsByUniqueKey[key], evolution)
|
||||
}
|
||||
return evolutionsByUniqueKey, nil
|
||||
}
|
||||
|
||||
// chunkSizeFirstSeenMetricMetadata limits the number of tuples per SQL query to avoid hitting the max_query_size limit.
|
||||
//
|
||||
// Calculation Logic:
|
||||
|
||||
@@ -39,6 +39,7 @@ func TestGetFirstSeenFromMetricMetadata(t *testing.T) {
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
DBName,
|
||||
AttributesMetadataLocalTableName,
|
||||
ColumnEvolutionMetadataTableName,
|
||||
)
|
||||
|
||||
lookupKeys := []telemetrytypes.MetricMetadataLookupKey{
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
@@ -38,6 +39,7 @@ func newTestTelemetryMetaStoreTestHelper(store telemetrystore.TelemetryStore) te
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
DBName,
|
||||
AttributesMetadataLocalTableName,
|
||||
ColumnEvolutionMetadataTableName,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -384,3 +386,386 @@ func TestGetMetricFieldValuesIntrinsicBoolReturnsEmpty(t *testing.T) {
|
||||
assert.Empty(t, values.BoolValues)
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
var (
|
||||
clickHouseQueryPatternWithFieldName = "SELECT.*signal.*column_name.*column_type.*field_context.*field_name.*version.*release_time.*FROM.*distributed_column_evolution_metadata.*WHERE.*signal.*=.*field_context.*=.*field_name.*=.*field_name.*=.*"
|
||||
clickHouseQueryPatternWithoutFieldName = "SELECT.*signal.*column_name.*column_type.*field_context.*field_name.*version.*release_time.*FROM.*distributed_column_evolution_metadata.*WHERE.*signal.*=.*field_context.*=.*ORDER BY.*release_time.*ASC"
|
||||
clickHouseColumns = []cmock.ColumnType{
|
||||
{Name: "signal", Type: "String"},
|
||||
{Name: "column_name", Type: "String"},
|
||||
{Name: "column_type", Type: "String"},
|
||||
{Name: "field_context", Type: "String"},
|
||||
{Name: "field_name", Type: "String"},
|
||||
{Name: "version", Type: "UInt32"},
|
||||
{Name: "release_time", Type: "Float64"},
|
||||
}
|
||||
)
|
||||
|
||||
func createMockRows(values [][]any) *cmock.Rows {
|
||||
return cmock.NewRows(clickHouseColumns, values)
|
||||
}
|
||||
|
||||
func TestKeyEvolutionMetadata_Get_Multi_FetchFromClickHouse(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, ®exMatcher{})
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
values := [][]any{
|
||||
{
|
||||
"logs",
|
||||
"resources_string",
|
||||
"Map(LowCardinality(String), String)",
|
||||
"resource",
|
||||
"__all__",
|
||||
uint32(0),
|
||||
float64(releaseTime.UnixNano()),
|
||||
},
|
||||
}
|
||||
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
}
|
||||
|
||||
rows := createMockRows(values)
|
||||
mock.ExpectQuery(clickHouseQueryPatternWithoutFieldName).WithArgs(telemetrytypes.SignalLogs, telemetrytypes.FieldContextResource, "__all__").WillReturnRows(rows)
|
||||
|
||||
metadata := newTestTelemetryMetaStoreTestHelper(telemetryStore)
|
||||
result, err := metadata.GetColumnEvolutionMetadataMulti(ctx, []*telemetrytypes.EvolutionSelector{selector})
|
||||
require.NoError(t, err)
|
||||
|
||||
expectedKey := "logs:resource:__all__"
|
||||
require.Contains(t, result, expectedKey)
|
||||
require.Len(t, result[expectedKey], 1)
|
||||
assert.Equal(t, telemetrytypes.SignalLogs, result[expectedKey][0].Signal)
|
||||
assert.Equal(t, "resources_string", result[expectedKey][0].ColumnName)
|
||||
assert.Equal(t, "Map(LowCardinality(String), String)", result[expectedKey][0].ColumnType)
|
||||
assert.Equal(t, telemetrytypes.FieldContextResource, result[expectedKey][0].FieldContext)
|
||||
assert.Equal(t, "__all__", result[expectedKey][0].FieldName)
|
||||
assert.Equal(t, releaseTime.UnixNano(), result[expectedKey][0].ReleaseTime.UnixNano())
|
||||
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
func TestKeyEvolutionMetadata_Get_Multi_MultipleMetadataEntries(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, ®exMatcher{})
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
releaseTime1 := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
releaseTime2 := time.Date(2024, 2, 15, 10, 0, 0, 0, time.UTC)
|
||||
|
||||
values := [][]any{
|
||||
{
|
||||
"logs",
|
||||
"resources_string",
|
||||
"Map(LowCardinality(String), String)",
|
||||
"resource",
|
||||
"__all__",
|
||||
uint32(0),
|
||||
float64(releaseTime1.UnixNano()),
|
||||
},
|
||||
{
|
||||
"logs",
|
||||
"resource",
|
||||
"JSON()",
|
||||
"resource",
|
||||
"__all__",
|
||||
uint32(1),
|
||||
float64(releaseTime2.UnixNano()),
|
||||
},
|
||||
}
|
||||
|
||||
rows := createMockRows(values)
|
||||
mock.ExpectQuery(clickHouseQueryPatternWithoutFieldName).WithArgs(telemetrytypes.SignalLogs, telemetrytypes.FieldContextResource, "__all__").WillReturnRows(rows)
|
||||
|
||||
metadata := newTestTelemetryMetaStoreTestHelper(telemetryStore)
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
}
|
||||
result, err := metadata.GetColumnEvolutionMetadataMulti(ctx, []*telemetrytypes.EvolutionSelector{selector})
|
||||
require.NoError(t, err)
|
||||
|
||||
expectedKey := "logs:resource:__all__"
|
||||
require.Contains(t, result, expectedKey)
|
||||
require.Len(t, result[expectedKey], 2)
|
||||
assert.Equal(t, "resources_string", result[expectedKey][0].ColumnName)
|
||||
assert.Equal(t, "Map(LowCardinality(String), String)", result[expectedKey][0].ColumnType)
|
||||
assert.Equal(t, "resource", result[expectedKey][0].FieldContext.StringValue())
|
||||
assert.Equal(t, "__all__", result[expectedKey][0].FieldName)
|
||||
assert.Equal(t, releaseTime1.UnixNano(), result[expectedKey][0].ReleaseTime.UnixNano())
|
||||
assert.Equal(t, "resource", result[expectedKey][1].ColumnName)
|
||||
assert.Equal(t, "JSON()", result[expectedKey][1].ColumnType)
|
||||
assert.Equal(t, "resource", result[expectedKey][1].FieldContext.StringValue())
|
||||
assert.Equal(t, "__all__", result[expectedKey][1].FieldName)
|
||||
assert.Equal(t, releaseTime2.UnixNano(), result[expectedKey][1].ReleaseTime.UnixNano())
|
||||
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
func TestKeyEvolutionMetadata_Get_Multi_MultipleMetadataEntriesWithFieldName(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, ®exMatcher{})
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
releaseTime1 := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
releaseTime2 := time.Date(2024, 2, 15, 10, 0, 0, 0, time.UTC)
|
||||
releaseTime3 := time.Date(2024, 3, 15, 10, 0, 0, 0, time.UTC)
|
||||
|
||||
values := [][]any{
|
||||
{
|
||||
"logs",
|
||||
"body",
|
||||
"String",
|
||||
"body",
|
||||
"__all__",
|
||||
uint32(0),
|
||||
float64(releaseTime1.UnixNano()),
|
||||
},
|
||||
{
|
||||
"logs",
|
||||
"body_json",
|
||||
"JSON()",
|
||||
"body",
|
||||
"__all__",
|
||||
uint32(1),
|
||||
float64(releaseTime2.UnixNano()),
|
||||
},
|
||||
{
|
||||
"logs",
|
||||
"body_promoted",
|
||||
"JSON()",
|
||||
"body",
|
||||
"user.name",
|
||||
uint32(2),
|
||||
float64(releaseTime3.UnixNano()),
|
||||
},
|
||||
}
|
||||
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldName: "user.name",
|
||||
}
|
||||
|
||||
rows := createMockRows(values)
|
||||
mock.ExpectQuery(clickHouseQueryPatternWithFieldName).WithArgs(telemetrytypes.SignalLogs, telemetrytypes.FieldContextBody, selector.FieldName, "__all__").WillReturnRows(rows)
|
||||
|
||||
metadata := newTestTelemetryMetaStoreTestHelper(telemetryStore)
|
||||
result, err := metadata.GetColumnEvolutionMetadataMulti(ctx, []*telemetrytypes.EvolutionSelector{selector})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check entries for "__all__" field name
|
||||
expectedKeyAll := "logs:body:__all__"
|
||||
require.Contains(t, result, expectedKeyAll)
|
||||
require.Len(t, result[expectedKeyAll], 2)
|
||||
assert.Equal(t, "body", result[expectedKeyAll][0].ColumnName)
|
||||
assert.Equal(t, "String", result[expectedKeyAll][0].ColumnType)
|
||||
assert.Equal(t, "body", result[expectedKeyAll][0].FieldContext.StringValue())
|
||||
assert.Equal(t, "__all__", result[expectedKeyAll][0].FieldName)
|
||||
assert.Equal(t, releaseTime1.UnixNano(), result[expectedKeyAll][0].ReleaseTime.UnixNano())
|
||||
assert.Equal(t, "body_json", result[expectedKeyAll][1].ColumnName)
|
||||
assert.Equal(t, "JSON()", result[expectedKeyAll][1].ColumnType)
|
||||
assert.Equal(t, "body", result[expectedKeyAll][1].FieldContext.StringValue())
|
||||
assert.Equal(t, "__all__", result[expectedKeyAll][1].FieldName)
|
||||
assert.Equal(t, releaseTime2.UnixNano(), result[expectedKeyAll][1].ReleaseTime.UnixNano())
|
||||
|
||||
// Check entries for "user.name" field name
|
||||
expectedKeyUser := "logs:body:user.name"
|
||||
require.Contains(t, result, expectedKeyUser)
|
||||
require.Len(t, result[expectedKeyUser], 1)
|
||||
assert.Equal(t, "body_promoted", result[expectedKeyUser][0].ColumnName)
|
||||
assert.Equal(t, "JSON()", result[expectedKeyUser][0].ColumnType)
|
||||
assert.Equal(t, "body", result[expectedKeyUser][0].FieldContext.StringValue())
|
||||
assert.Equal(t, "user.name", result[expectedKeyUser][0].FieldName)
|
||||
assert.Equal(t, releaseTime3.UnixNano(), result[expectedKeyUser][0].ReleaseTime.UnixNano())
|
||||
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
func TestKeyEvolutionMetadata_Get_Multi_MultipleMetadataEntriesWithMultipleSelectors(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, ®exMatcher{})
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
// releaseTime1 := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
releaseTime2 := time.Date(2024, 2, 15, 10, 0, 0, 0, time.UTC)
|
||||
releaseTime3 := time.Date(2024, 3, 15, 10, 0, 0, 0, time.UTC)
|
||||
|
||||
values := [][]any{
|
||||
{
|
||||
"logs",
|
||||
"body_json",
|
||||
"JSON()",
|
||||
"body",
|
||||
"__all__",
|
||||
uint32(0),
|
||||
float64(releaseTime2.UnixNano()),
|
||||
},
|
||||
{
|
||||
"logs",
|
||||
"body_promoted",
|
||||
"JSON()",
|
||||
"body",
|
||||
"user.name",
|
||||
uint32(1),
|
||||
float64(releaseTime3.UnixNano()),
|
||||
},
|
||||
{
|
||||
"traces",
|
||||
"resources_string",
|
||||
"map()",
|
||||
telemetrytypes.FieldContextResource,
|
||||
"__all__",
|
||||
uint32(0),
|
||||
float64(releaseTime2.UnixNano()),
|
||||
},
|
||||
{
|
||||
telemetrytypes.SignalTraces,
|
||||
"resource",
|
||||
"JSON()",
|
||||
telemetrytypes.FieldContextResource,
|
||||
"__all__",
|
||||
uint32(1),
|
||||
float64(releaseTime3.UnixNano()),
|
||||
},
|
||||
}
|
||||
|
||||
selectors := []*telemetrytypes.EvolutionSelector{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldName: "user.name",
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "service.name",
|
||||
},
|
||||
}
|
||||
|
||||
query := `SELECT signal, column_name, column_type, field_context, field_name, version, release_time FROM signoz_metadata\.distributed_column_evolution_metadata WHERE ` +
|
||||
`\(\(signal = \? AND \(field_context = \? AND \(field_name = \? OR field_name = \?\)\)\) OR ` +
|
||||
`\(signal = \? AND \(field_context = \? AND \(field_name = \? OR field_name = \?\)\)\)\) ` +
|
||||
`ORDER BY release_time ASC`
|
||||
rows := createMockRows(values)
|
||||
mock.ExpectQuery(query).WithArgs(
|
||||
telemetrytypes.SignalLogs, telemetrytypes.FieldContextBody, selectors[0].FieldName, "__all__",
|
||||
telemetrytypes.SignalTraces, telemetrytypes.FieldContextResource, selectors[1].FieldName, "__all__",
|
||||
).WillReturnRows(rows)
|
||||
|
||||
metadata := newTestTelemetryMetaStoreTestHelper(telemetryStore)
|
||||
_, err := metadata.GetColumnEvolutionMetadataMulti(ctx, selectors)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
func TestKeyEvolutionMetadata_Get_Multi_EmptyResultFromClickHouse(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, ®exMatcher{})
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
rows := createMockRows([][]any{})
|
||||
mock.ExpectQuery(clickHouseQueryPatternWithoutFieldName).WithArgs(telemetrytypes.SignalLogs, telemetrytypes.FieldContextResource, "__all__").WillReturnRows(rows)
|
||||
|
||||
metadata := newTestTelemetryMetaStoreTestHelper(telemetryStore)
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
}
|
||||
result, err := metadata.GetColumnEvolutionMetadataMulti(ctx, []*telemetrytypes.EvolutionSelector{selector})
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Empty(t, result)
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
func TestKeyEvolutionMetadata_Get_Multi_ClickHouseQueryError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, ®exMatcher{})
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
mock.ExpectQuery(clickHouseQueryPatternWithoutFieldName).WithArgs(telemetrytypes.SignalLogs, telemetrytypes.FieldContextResource, "__all__").WillReturnError(assert.AnError)
|
||||
|
||||
metadata := newTestTelemetryMetaStoreTestHelper(telemetryStore)
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
}
|
||||
_, err := metadata.GetColumnEvolutionMetadataMulti(ctx, []*telemetrytypes.EvolutionSelector{selector})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestKeyEvolutionMetadata_Get_Multi_MultipleSelectors(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, ®exMatcher{})
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
releaseTime1 := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
releaseTime2 := time.Date(2024, 2, 15, 10, 0, 0, 0, time.UTC)
|
||||
|
||||
values := [][]any{
|
||||
{
|
||||
telemetrytypes.SignalLogs,
|
||||
"resources_string",
|
||||
"Map(LowCardinality(String), String)",
|
||||
telemetrytypes.FieldContextResource,
|
||||
"__all__",
|
||||
uint32(0),
|
||||
float64(releaseTime1.UnixNano()),
|
||||
},
|
||||
{
|
||||
telemetrytypes.SignalLogs,
|
||||
"body",
|
||||
"JSON()",
|
||||
telemetrytypes.FieldContextBody,
|
||||
"__all__",
|
||||
uint32(1),
|
||||
float64(releaseTime2.UnixNano()),
|
||||
},
|
||||
}
|
||||
|
||||
// When multiple selectors are provided, the query will have OR conditions
|
||||
// The pattern should match queries with multiple OR clauses
|
||||
queryPattern := "SELECT.*signal.*column_name.*column_type.*field_context.*field_name.*release_time.*FROM.*distributed_column_evolution_metadata.*WHERE.*ORDER BY.*release_time.*ASC"
|
||||
rows := createMockRows(values)
|
||||
mock.ExpectQuery(queryPattern).WillReturnRows(rows).WithArgs(telemetrytypes.SignalLogs, telemetrytypes.FieldContextResource, "__all__", "__all__", telemetrytypes.SignalLogs, telemetrytypes.FieldContextBody, "__all__", "__all__")
|
||||
|
||||
metadata := newTestTelemetryMetaStoreTestHelper(telemetryStore)
|
||||
selectors := []*telemetrytypes.EvolutionSelector{
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldName: "__all__",
|
||||
},
|
||||
}
|
||||
result, err := metadata.GetColumnEvolutionMetadataMulti(ctx, selectors)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should have entries for both selectors
|
||||
expectedKey1 := "logs:resource:__all__"
|
||||
expectedKey2 := "logs:body:__all__"
|
||||
require.Contains(t, result, expectedKey1)
|
||||
require.Contains(t, result, expectedKey2)
|
||||
require.Len(t, result[expectedKey1], 1)
|
||||
require.Len(t, result[expectedKey2], 1)
|
||||
assert.Equal(t, "resources_string", result[expectedKey1][0].ColumnName)
|
||||
assert.Equal(t, "body", result[expectedKey2][0].ColumnName)
|
||||
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ const (
|
||||
DBName = "signoz_metadata"
|
||||
AttributesMetadataTableName = "distributed_attributes_metadata"
|
||||
AttributesMetadataLocalTableName = "attributes_metadata"
|
||||
ColumnEvolutionMetadataTableName = "distributed_column_evolution_metadata"
|
||||
PathTypesTableName = otelcollectorconst.DistributedPathTypesTable
|
||||
PromotedPathsTableName = otelcollectorconst.DistributedPromotedPathsTable
|
||||
SkipIndexTableName = "system.data_skipping_indices"
|
||||
|
||||
@@ -120,7 +120,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
||||
stepSec,
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
|
||||
if err != nil {
|
||||
return "", []any{}, err
|
||||
}
|
||||
@@ -142,13 +142,16 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
||||
)
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
StartNs: start,
|
||||
EndNs: end,
|
||||
})
|
||||
if err != nil {
|
||||
return "", []any{}, err
|
||||
}
|
||||
@@ -200,7 +203,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
|
||||
))
|
||||
|
||||
for _, g := range query.GroupBy {
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
@@ -225,13 +228,16 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
|
||||
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
StartNs: start,
|
||||
EndNs: end,
|
||||
})
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
@@ -270,7 +276,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
stepSec,
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
@@ -289,13 +295,16 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
)
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
StartNs: start,
|
||||
EndNs: end,
|
||||
})
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
@@ -23,6 +23,8 @@ func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
|
||||
|
||||
func (c *conditionBuilder) conditionFor(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
@@ -39,7 +41,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, key)
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -135,14 +137,14 @@ func (c *conditionBuilder) conditionFor(
|
||||
|
||||
func (c *conditionBuilder) ConditionFor(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
_ uint64,
|
||||
_ uint64,
|
||||
) (string, error) {
|
||||
condition, err := c.conditionFor(ctx, key, operator, value, sb)
|
||||
condition, err := c.conditionFor(ctx, startNs, endNs, key, operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -234,7 +234,7 @@ func TestConditionFor(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, &tc.key, tc.operator, tc.value, sb, 0, 0)
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, 0, 0, &tc.key, tc.operator, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
@@ -289,7 +289,7 @@ func TestConditionForMultipleKeys(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var err error
|
||||
for _, key := range tc.keys {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, &key, tc.operator, tc.value, sb, 0, 0)
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, 0, 0, &key, tc.operator, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting condition for key %s: %v", key.Name, err)
|
||||
|
||||
@@ -41,62 +41,63 @@ func NewFieldMapper() qbtypes.FieldMapper {
|
||||
return &fieldMapper{}
|
||||
}
|
||||
|
||||
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
||||
func (m *fieldMapper) getColumn(_ context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextScope, telemetrytypes.FieldContextAttribute:
|
||||
return timeSeriesV4Columns["labels"], nil
|
||||
return []*schema.Column{timeSeriesV4Columns["labels"]}, nil
|
||||
case telemetrytypes.FieldContextMetric:
|
||||
col, ok := timeSeriesV4Columns[key.Name]
|
||||
if !ok {
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
return []*schema.Column{}, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
return col, nil
|
||||
return []*schema.Column{col}, nil
|
||||
case telemetrytypes.FieldContextUnspecified:
|
||||
col, ok := timeSeriesV4Columns[key.Name]
|
||||
if !ok {
|
||||
// if nothing is found, return labels column
|
||||
// as we keep all the labels in the labels column
|
||||
return timeSeriesV4Columns["labels"], nil
|
||||
return []*schema.Column{timeSeriesV4Columns["labels"]}, nil
|
||||
}
|
||||
return col, nil
|
||||
return []*schema.Column{col}, nil
|
||||
}
|
||||
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
column, err := m.getColumn(ctx, key)
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, startNs, endNs uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
columns, err := m.getColumn(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextScope, telemetrytypes.FieldContextAttribute:
|
||||
return fmt.Sprintf("JSONExtractString(%s, '%s')", column.Name, key.Name), nil
|
||||
return fmt.Sprintf("JSONExtractString(%s, '%s')", columns[0].Name, key.Name), nil
|
||||
case telemetrytypes.FieldContextMetric:
|
||||
return column.Name, nil
|
||||
return columns[0].Name, nil
|
||||
case telemetrytypes.FieldContextUnspecified:
|
||||
if slices.Contains(IntrinsicFields, key.Name) {
|
||||
return column.Name, nil
|
||||
return columns[0].Name, nil
|
||||
}
|
||||
return fmt.Sprintf("JSONExtractString(%s, '%s')", column.Name, key.Name), nil
|
||||
return fmt.Sprintf("JSONExtractString(%s, '%s')", columns[0].Name, key.Name), nil
|
||||
}
|
||||
|
||||
return column.Name, nil
|
||||
return columns[0].Name, nil
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
||||
return m.getColumn(ctx, key)
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
return m.getColumn(ctx, tsStart, tsEnd, key)
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnExpressionFor(
|
||||
ctx context.Context,
|
||||
startNs, endNs uint64,
|
||||
field *telemetrytypes.TelemetryFieldKey,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
|
||||
colName, err := m.FieldFor(ctx, field)
|
||||
colName, err := m.FieldFor(ctx, startNs, endNs, field)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -123,13 +123,13 @@ func TestGetColumn(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
col, err := fm.ColumnFor(ctx, &tc.key)
|
||||
col, err := fm.ColumnFor(ctx, 0, 0, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedCol, col)
|
||||
assert.Equal(t, tc.expectedCol, col[0])
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -207,7 +207,7 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result, err := fm.FieldFor(ctx, &tc.key)
|
||||
result, err := fm.FieldFor(ctx, 0, 0, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
|
||||
@@ -347,13 +347,16 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
|
||||
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
StartNs: start,
|
||||
EndNs: end,
|
||||
})
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
@@ -364,7 +367,7 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
|
||||
|
||||
sb.Select("fingerprint")
|
||||
for _, g := range query.GroupBy {
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
@@ -29,6 +29,8 @@ func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
|
||||
|
||||
func (c *conditionBuilder) conditionFor(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
@@ -46,13 +48,13 @@ func (c *conditionBuilder) conditionFor(
|
||||
}
|
||||
|
||||
// first, locate the raw column type (so we can choose the right EXISTS logic)
|
||||
column, err := c.fm.ColumnFor(ctx, key)
|
||||
columns, err := c.fm.ColumnFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// then ask the mapper for the actual SQL reference
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, key)
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -165,7 +167,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
|
||||
var value any
|
||||
switch column.Type.GetType() {
|
||||
switch columns[0].Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(tblFieldName), nil
|
||||
@@ -182,7 +184,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
switch elementType := columns[0].Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
@@ -206,14 +208,14 @@ func (c *conditionBuilder) conditionFor(
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
keyType := columns[0].Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, columns[0].Type)
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
switch valueType := columns[0].Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", columns[0].Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
}
|
||||
@@ -226,7 +228,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", columns[0].Type)
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
@@ -234,25 +236,25 @@ func (c *conditionBuilder) conditionFor(
|
||||
|
||||
func (c *conditionBuilder) ConditionFor(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
startNs uint64,
|
||||
_ uint64,
|
||||
) (string, error) {
|
||||
if c.isSpanScopeField(key.Name) {
|
||||
return c.buildSpanScopeCondition(key, operator, value, startNs)
|
||||
}
|
||||
|
||||
condition, err := c.conditionFor(ctx, key, operator, value, sb)
|
||||
condition, err := c.conditionFor(ctx, startNs, endNs, key, operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if operator.AddDefaultExistsFilter() {
|
||||
// skip adding exists filter for intrinsic fields
|
||||
field, _ := c.fm.FieldFor(ctx, key)
|
||||
field, _ := c.fm.FieldFor(ctx, startNs, endNs, key)
|
||||
if slices.Contains(maps.Keys(IntrinsicFields), field) ||
|
||||
slices.Contains(maps.Keys(IntrinsicFieldsDeprecated), field) ||
|
||||
slices.Contains(maps.Keys(CalculatedFields), field) ||
|
||||
@@ -260,7 +262,7 @@ func (c *conditionBuilder) ConditionFor(
|
||||
return condition, nil
|
||||
}
|
||||
|
||||
existsCondition, err := c.conditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb)
|
||||
existsCondition, err := c.conditionFor(ctx, startNs, endNs, key, qbtypes.FilterOperatorExists, nil, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -289,7 +289,7 @@ func TestConditionFor(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, &tc.key, tc.operator, tc.value, sb, 1761437108000000000, 1761458708000000000)
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, 1761437108000000000, 1761458708000000000, &tc.key, tc.operator, tc.value, sb)
|
||||
sb.Where(cond)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
|
||||
@@ -169,23 +169,24 @@ func NewFieldMapper() *defaultFieldMapper {
|
||||
|
||||
func (m *defaultFieldMapper) getColumn(
|
||||
_ context.Context,
|
||||
_, _ uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
) (*schema.Column, error) {
|
||||
) ([]*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
return indexV3Columns["resource"], nil
|
||||
return []*schema.Column{indexV3Columns["resource"]}, nil
|
||||
case telemetrytypes.FieldContextScope:
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
return []*schema.Column{}, qbtypes.ErrColumnNotFound
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
switch key.FieldDataType {
|
||||
case telemetrytypes.FieldDataTypeString:
|
||||
return indexV3Columns["attributes_string"], nil
|
||||
return []*schema.Column{indexV3Columns["attributes_string"]}, nil
|
||||
case telemetrytypes.FieldDataTypeInt64,
|
||||
telemetrytypes.FieldDataTypeFloat64,
|
||||
telemetrytypes.FieldDataTypeNumber:
|
||||
return indexV3Columns["attributes_number"], nil
|
||||
return []*schema.Column{indexV3Columns["attributes_number"]}, nil
|
||||
case telemetrytypes.FieldDataTypeBool:
|
||||
return indexV3Columns["attributes_bool"], nil
|
||||
return []*schema.Column{indexV3Columns["attributes_bool"]}, nil
|
||||
}
|
||||
case telemetrytypes.FieldContextSpan, telemetrytypes.FieldContextUnspecified:
|
||||
/*
|
||||
@@ -196,7 +197,7 @@ func (m *defaultFieldMapper) getColumn(
|
||||
// Check if this is a span scope field
|
||||
if strings.ToLower(key.Name) == SpanSearchScopeRoot || strings.ToLower(key.Name) == SpanSearchScopeEntryPoint {
|
||||
// The actual SQL will be generated in the condition builder
|
||||
return &schema.Column{Name: key.Name, Type: schema.ColumnTypeBool}, nil
|
||||
return []*schema.Column{{Name: key.Name, Type: schema.ColumnTypeBool}}, nil
|
||||
}
|
||||
|
||||
// TODO(srikanthccv): remove this when it's safe to remove
|
||||
@@ -210,18 +211,18 @@ func (m *defaultFieldMapper) getColumn(
|
||||
if _, ok := CalculatedFieldsDeprecated[key.Name]; ok {
|
||||
// Check if we have a mapping for the deprecated calculated field
|
||||
if col, ok := indexV3Columns[oldToNew[key.Name]]; ok {
|
||||
return col, nil
|
||||
return []*schema.Column{col}, nil
|
||||
}
|
||||
}
|
||||
if _, ok := IntrinsicFieldsDeprecated[key.Name]; ok {
|
||||
// Check if we have a mapping for the deprecated intrinsic field
|
||||
if col, ok := indexV3Columns[oldToNew[key.Name]]; ok {
|
||||
return col, nil
|
||||
return []*schema.Column{col}, nil
|
||||
}
|
||||
}
|
||||
|
||||
if col, ok := indexV3Columns[key.Name]; ok {
|
||||
return col, nil
|
||||
return []*schema.Column{col}, nil
|
||||
}
|
||||
}
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
@@ -229,15 +230,17 @@ func (m *defaultFieldMapper) getColumn(
|
||||
|
||||
func (m *defaultFieldMapper) ColumnFor(
|
||||
ctx context.Context,
|
||||
startNs, endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
) (*schema.Column, error) {
|
||||
return m.getColumn(ctx, key)
|
||||
) ([]*schema.Column, error) {
|
||||
return m.getColumn(ctx, startNs, endNs, key)
|
||||
}
|
||||
|
||||
// FieldFor returns the table field name for the given key if it exists
|
||||
// otherwise it returns qbtypes.ErrColumnNotFound
|
||||
func (m *defaultFieldMapper) FieldFor(
|
||||
ctx context.Context,
|
||||
startNs, endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
// Special handling for span scope fields
|
||||
@@ -247,10 +250,11 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
return key.Name, nil
|
||||
}
|
||||
|
||||
column, err := m.getColumn(ctx, key)
|
||||
columns, err := m.getColumn(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
column := columns[0]
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
@@ -310,11 +314,12 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
// if it exists otherwise it returns qbtypes.ErrColumnNotFound
|
||||
func (m *defaultFieldMapper) ColumnExpressionFor(
|
||||
ctx context.Context,
|
||||
startNs, endNs uint64,
|
||||
field *telemetrytypes.TelemetryFieldKey,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
|
||||
colName, err := m.FieldFor(ctx, field)
|
||||
colName, err := m.FieldFor(ctx, startNs, endNs, field)
|
||||
if errors.Is(err, qbtypes.ErrColumnNotFound) {
|
||||
// the key didn't have the right context to be added to the query
|
||||
// we try to use the context we know of
|
||||
@@ -324,7 +329,7 @@ func (m *defaultFieldMapper) ColumnExpressionFor(
|
||||
if _, ok := indexV3Columns[field.Name]; ok {
|
||||
// if it is, attach the column name directly
|
||||
field.FieldContext = telemetrytypes.FieldContextSpan
|
||||
colName, _ = m.FieldFor(ctx, field)
|
||||
colName, _ = m.FieldFor(ctx, startNs, endNs, field)
|
||||
} else {
|
||||
// - the context is not provided
|
||||
// - there are not keys for the field
|
||||
@@ -342,12 +347,12 @@ func (m *defaultFieldMapper) ColumnExpressionFor(
|
||||
}
|
||||
} else if len(keysForField) == 1 {
|
||||
// we have a single key for the field, use it
|
||||
colName, _ = m.FieldFor(ctx, keysForField[0])
|
||||
colName, _ = m.FieldFor(ctx, startNs, endNs, keysForField[0])
|
||||
} else {
|
||||
// select any non-empty value from the keys
|
||||
args := []string{}
|
||||
for _, key := range keysForField {
|
||||
colName, _ = m.FieldFor(ctx, key)
|
||||
colName, _ = m.FieldFor(ctx, startNs, endNs, key)
|
||||
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
|
||||
}
|
||||
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
|
||||
|
||||
@@ -92,7 +92,7 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
result, err := fm.FieldFor(ctx, &tc.key)
|
||||
result, err := fm.FieldFor(ctx, 0, 0, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package telemetrytraces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
@@ -75,13 +76,16 @@ func TestSpanScopeFilterExpression(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
}}
|
||||
|
||||
whereClause, err := querybuilder.PrepareWhereClause(tt.expression, querybuilder.FilterExprVisitorOpts{
|
||||
whereClause, err := querybuilder.PrepareWhereClause(tt.expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: context.Background(),
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: fieldKeys,
|
||||
Builder: sb,
|
||||
}, tt.startNs, 1761458708000000000)
|
||||
StartNs: tt.startNs,
|
||||
EndNs: 1761458708000000000,
|
||||
})
|
||||
|
||||
if tt.expectError {
|
||||
assert.Error(t, err)
|
||||
@@ -142,13 +146,16 @@ func TestSpanScopeWithResourceFilter(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
}}
|
||||
|
||||
_, err := querybuilder.PrepareWhereClause(tt.expression, querybuilder.FilterExprVisitorOpts{
|
||||
_, err := querybuilder.PrepareWhereClause(tt.expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: context.Background(),
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: fieldKeys,
|
||||
SkipResourceFilter: false, // This would be set by the statement builder
|
||||
}, 1761437108000000000, 1761458708000000000)
|
||||
StartNs: 1761437108000000000,
|
||||
EndNs: 1761458708000000000,
|
||||
})
|
||||
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
@@ -313,7 +313,7 @@ func (b *traceQueryStatementBuilder) buildListQuery(
|
||||
|
||||
// TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs?
|
||||
for _, field := range query.SelectFields {
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys)
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &field, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -331,7 +331,7 @@ func (b *traceQueryStatementBuilder) buildListQuery(
|
||||
|
||||
// Add order by
|
||||
for _, orderBy := range query.Order {
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &orderBy.Key.TelemetryFieldKey, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -515,7 +515,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
|
||||
// Keep original column expressions so we can build the tuple
|
||||
fieldNames := make([]string, 0, len(query.GroupBy))
|
||||
for _, gb := range query.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil)
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -529,7 +529,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
|
||||
allAggChArgs := make([]any, 0)
|
||||
for i, agg := range query.Aggregations {
|
||||
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
||||
ctx, agg.Expression,
|
||||
ctx, start, end, agg.Expression,
|
||||
uint64(query.StepInterval.Seconds()),
|
||||
keys,
|
||||
)
|
||||
@@ -657,7 +657,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
|
||||
|
||||
var allGroupByArgs []any
|
||||
for _, gb := range query.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil)
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -674,7 +674,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
|
||||
for idx := range query.Aggregations {
|
||||
aggExpr := query.Aggregations[idx]
|
||||
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
||||
ctx, aggExpr.Expression,
|
||||
ctx, start, end, aggExpr.Expression,
|
||||
rateInterval,
|
||||
keys,
|
||||
)
|
||||
@@ -746,7 +746,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
|
||||
|
||||
// buildFilterCondition builds SQL condition from filter expression
|
||||
func (b *traceQueryStatementBuilder) addFilterCondition(
|
||||
_ context.Context,
|
||||
ctx context.Context,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
||||
@@ -760,13 +760,16 @@ func (b *traceQueryStatementBuilder) addFilterCondition(
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
// add filter expression
|
||||
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
SkipResourceFilter: true,
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
StartNs: start,
|
||||
EndNs: end,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -232,12 +232,15 @@ func (b *traceOperatorCTEBuilder) buildQueryCTE(ctx context.Context, queryName s
|
||||
filterWhereClause, err := querybuilder.PrepareWhereClause(
|
||||
query.Filter.Expression,
|
||||
querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.stmtBuilder.logger,
|
||||
FieldMapper: b.stmtBuilder.fm,
|
||||
ConditionBuilder: b.stmtBuilder.cb,
|
||||
FieldKeys: keys,
|
||||
SkipResourceFilter: true,
|
||||
}, b.start, b.end,
|
||||
StartNs: b.start,
|
||||
EndNs: b.end,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
b.stmtBuilder.logger.ErrorContext(ctx, "Failed to prepare where clause", "error", err, "filter", query.Filter.Expression)
|
||||
@@ -450,7 +453,7 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
|
||||
if selectedFields[field.Name] {
|
||||
continue
|
||||
}
|
||||
colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, &field, keys)
|
||||
colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, b.start, b.end, &field, keys)
|
||||
if err != nil {
|
||||
b.stmtBuilder.logger.WarnContext(ctx, "failed to map select field",
|
||||
"field", field.Name, "error", err)
|
||||
@@ -465,7 +468,7 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
|
||||
// Add order by support using ColumnExpressionFor
|
||||
orderApplied := false
|
||||
for _, orderBy := range b.operator.Order {
|
||||
colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
|
||||
colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, b.start, b.end, &orderBy.Key.TelemetryFieldKey, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -547,6 +550,8 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(
|
||||
ctx,
|
||||
b.start,
|
||||
b.end,
|
||||
&gb.TelemetryFieldKey,
|
||||
b.stmtBuilder.fm,
|
||||
b.stmtBuilder.cb,
|
||||
@@ -571,6 +576,8 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
|
||||
for i, agg := range b.operator.Aggregations {
|
||||
rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite(
|
||||
ctx,
|
||||
b.start,
|
||||
b.end,
|
||||
agg.Expression,
|
||||
uint64(b.operator.StepInterval.Seconds()),
|
||||
keys,
|
||||
@@ -656,6 +663,8 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(
|
||||
ctx,
|
||||
b.start,
|
||||
b.end,
|
||||
&gb.TelemetryFieldKey,
|
||||
b.stmtBuilder.fm,
|
||||
b.stmtBuilder.cb,
|
||||
@@ -682,6 +691,8 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
|
||||
for i, agg := range b.operator.Aggregations {
|
||||
rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite(
|
||||
ctx,
|
||||
b.start,
|
||||
b.end,
|
||||
agg.Expression,
|
||||
rateInterval,
|
||||
keys,
|
||||
@@ -795,6 +806,8 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(
|
||||
ctx,
|
||||
b.start,
|
||||
b.end,
|
||||
&gb.TelemetryFieldKey,
|
||||
b.stmtBuilder.fm,
|
||||
b.stmtBuilder.cb,
|
||||
@@ -819,6 +832,8 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
|
||||
for i, agg := range b.operator.Aggregations {
|
||||
rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite(
|
||||
ctx,
|
||||
b.start,
|
||||
b.end,
|
||||
agg.Expression,
|
||||
uint64((b.end-b.start)/querybuilder.NsToSeconds),
|
||||
keys,
|
||||
|
||||
@@ -2,12 +2,13 @@ package telemetrytraces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
type traceOperatorStatementBuilder struct {
|
||||
|
||||
@@ -21,24 +21,23 @@ type JsonKeyToFieldFunc func(context.Context, *telemetrytypes.TelemetryFieldKey,
|
||||
// FieldMapper maps the telemetry field key to the table field name.
|
||||
type FieldMapper interface {
|
||||
// FieldFor returns the field name for the given key.
|
||||
FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error)
|
||||
FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error)
|
||||
// ColumnFor returns the column for the given key.
|
||||
ColumnFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error)
|
||||
ColumnFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error)
|
||||
// ColumnExpressionFor returns the column expression for the given key.
|
||||
ColumnExpressionFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, error)
|
||||
ColumnExpressionFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, error)
|
||||
}
|
||||
|
||||
// ConditionBuilder builds the condition for the filter.
|
||||
type ConditionBuilder interface {
|
||||
// ConditionFor returns the condition for the given key, operator and value.
|
||||
// TODO(srikanthccv,nikhilmantri0902): remove startNs, endNs when top_level_operations can be replaced with `is_remote`
|
||||
ConditionFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, operator FilterOperator, value any, sb *sqlbuilder.SelectBuilder, startNs uint64, endNs uint64) (string, error)
|
||||
ConditionFor(ctx context.Context, startNs uint64, endNs uint64, key *telemetrytypes.TelemetryFieldKey, operator FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error)
|
||||
}
|
||||
|
||||
type AggExprRewriter interface {
|
||||
// Rewrite rewrites the aggregation expression to be used in the query.
|
||||
Rewrite(ctx context.Context, expr string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, []any, error)
|
||||
RewriteMulti(ctx context.Context, exprs []string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) ([]string, [][]any, error)
|
||||
Rewrite(ctx context.Context, startNs, endNs uint64, expr string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, []any, error)
|
||||
RewriteMulti(ctx context.Context, startNs, endNs uint64, exprs []string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) ([]string, [][]any, error)
|
||||
}
|
||||
|
||||
type Statement struct {
|
||||
|
||||
25
pkg/types/telemetrytypes/evolution_metadata.go
Normal file
25
pkg/types/telemetrytypes/evolution_metadata.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package telemetrytypes
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type EvolutionEntry struct {
|
||||
Signal Signal `json:"signal"`
|
||||
ColumnName string `json:"column_name"`
|
||||
ColumnType string `json:"column_type"`
|
||||
FieldContext FieldContext `json:"field_context"`
|
||||
FieldName string `json:"field_name"`
|
||||
ReleaseTime time.Time `json:"release_time"`
|
||||
Version uint32 `json:"version"`
|
||||
}
|
||||
|
||||
type EvolutionSelector struct {
|
||||
Signal Signal
|
||||
FieldContext FieldContext
|
||||
FieldName string
|
||||
}
|
||||
|
||||
func GetEvolutionMetadataUniqueKey(selector *EvolutionSelector) string {
|
||||
return selector.Signal.StringValue() + ":" + selector.FieldContext.StringValue() + ":" + selector.FieldName
|
||||
}
|
||||
@@ -39,6 +39,8 @@ type TelemetryFieldKey struct {
|
||||
JSONPlan JSONAccessPlan `json:"-"`
|
||||
Indexes []JSONDataTypeIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
|
||||
Evolutions []*EvolutionEntry `json:"-"`
|
||||
}
|
||||
|
||||
func (f *TelemetryFieldKey) KeyNameContainsArray() bool {
|
||||
|
||||
@@ -41,6 +41,9 @@ type MetadataStore interface {
|
||||
// PromotePaths promotes the paths.
|
||||
PromotePaths(ctx context.Context, paths ...string) error
|
||||
|
||||
// GetColumnEvolutionMetadataMulti returns a list of evolution entries for the given selectors.
|
||||
GetColumnEvolutionMetadataMulti(ctx context.Context, selectors []*EvolutionSelector) (map[string][]*EvolutionEntry, error)
|
||||
|
||||
// GetFirstSeenFromMetricMetadata gets the first seen timestamp for a metric metadata lookup key.
|
||||
GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []MetricMetadataLookupKey) (map[MetricMetadataLookupKey]int64, error)
|
||||
}
|
||||
|
||||
@@ -12,25 +12,27 @@ import (
|
||||
// MockMetadataStore implements the MetadataStore interface for testing purposes
|
||||
type MockMetadataStore struct {
|
||||
// Maps to store test data
|
||||
KeysMap map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
RelatedValuesMap map[string][]string
|
||||
AllValuesMap map[string]*telemetrytypes.TelemetryFieldValues
|
||||
TemporalityMap map[string]metrictypes.Temporality
|
||||
PromotedPathsMap map[string]struct{}
|
||||
LogsJSONIndexesMap map[string][]schemamigrator.Index
|
||||
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
|
||||
KeysMap map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
RelatedValuesMap map[string][]string
|
||||
AllValuesMap map[string]*telemetrytypes.TelemetryFieldValues
|
||||
TemporalityMap map[string]metrictypes.Temporality
|
||||
PromotedPathsMap map[string]struct{}
|
||||
LogsJSONIndexesMap map[string][]schemamigrator.Index
|
||||
ColumnEvolutionMetadataMap map[string][]*telemetrytypes.EvolutionEntry
|
||||
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
|
||||
}
|
||||
|
||||
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps
|
||||
func NewMockMetadataStore() *MockMetadataStore {
|
||||
return &MockMetadataStore{
|
||||
KeysMap: make(map[string][]*telemetrytypes.TelemetryFieldKey),
|
||||
RelatedValuesMap: make(map[string][]string),
|
||||
AllValuesMap: make(map[string]*telemetrytypes.TelemetryFieldValues),
|
||||
TemporalityMap: make(map[string]metrictypes.Temporality),
|
||||
PromotedPathsMap: make(map[string]struct{}),
|
||||
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
|
||||
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
KeysMap: make(map[string][]*telemetrytypes.TelemetryFieldKey),
|
||||
RelatedValuesMap: make(map[string][]string),
|
||||
AllValuesMap: make(map[string]*telemetrytypes.TelemetryFieldValues),
|
||||
TemporalityMap: make(map[string]metrictypes.Temporality),
|
||||
PromotedPathsMap: make(map[string]struct{}),
|
||||
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
|
||||
ColumnEvolutionMetadataMap: make(map[string][]*telemetrytypes.EvolutionEntry),
|
||||
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,9 +97,51 @@ func (m *MockMetadataStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
|
||||
}
|
||||
}
|
||||
|
||||
// fetch and add evolutions
|
||||
for k, v := range result {
|
||||
keys := v
|
||||
evolutionMetadataKeySelectors := getEvolutionMetadataKeySelectors(keys)
|
||||
evolutions, err := m.GetColumnEvolutionMetadataMulti(ctx, evolutionMetadataKeySelectors)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
for i, key := range keys {
|
||||
// first check if there is evolutions that with field name as __all__
|
||||
// then check for specific field name
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: key.Signal,
|
||||
FieldContext: key.FieldContext,
|
||||
FieldName: "__all__",
|
||||
}
|
||||
|
||||
if keyEvolutions, ok := evolutions[telemetrytypes.GetEvolutionMetadataUniqueKey(selector)]; ok {
|
||||
keys[i].Evolutions = keyEvolutions
|
||||
}
|
||||
|
||||
selector.FieldName = key.Name
|
||||
if keyEvolutions, ok := evolutions[telemetrytypes.GetEvolutionMetadataUniqueKey(selector)]; ok {
|
||||
keys[i].Evolutions = keyEvolutions
|
||||
}
|
||||
}
|
||||
result[k] = keys
|
||||
}
|
||||
|
||||
return result, true, nil
|
||||
}
|
||||
|
||||
func getEvolutionMetadataKeySelectors(keySelectors []*telemetrytypes.TelemetryFieldKey) []*telemetrytypes.EvolutionSelector {
|
||||
var metadataKeySelectors []*telemetrytypes.EvolutionSelector
|
||||
for _, keySelector := range keySelectors {
|
||||
selector := &telemetrytypes.EvolutionSelector{
|
||||
Signal: keySelector.Signal,
|
||||
FieldContext: keySelector.FieldContext,
|
||||
FieldName: keySelector.Name,
|
||||
}
|
||||
metadataKeySelectors = append(metadataKeySelectors, selector)
|
||||
}
|
||||
return metadataKeySelectors
|
||||
}
|
||||
|
||||
// GetKey returns a list of keys with the given name
|
||||
func (m *MockMetadataStore) GetKey(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) {
|
||||
if fieldKeySelector == nil {
|
||||
@@ -310,6 +354,27 @@ func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...
|
||||
return m.LogsJSONIndexesMap, nil
|
||||
}
|
||||
|
||||
func (m *MockMetadataStore) GetColumnEvolutionMetadataMulti(ctx context.Context, selectors []*telemetrytypes.EvolutionSelector) (map[string][]*telemetrytypes.EvolutionEntry, error) {
|
||||
result := make(map[string][]*telemetrytypes.EvolutionEntry)
|
||||
|
||||
// Iterate over each selector
|
||||
for i, selector := range selectors {
|
||||
// Build the key: Signal:FieldContext:FieldName
|
||||
selector.FieldName = "__all__"
|
||||
key := telemetrytypes.GetEvolutionMetadataUniqueKey(selector)
|
||||
if entries, exists := m.ColumnEvolutionMetadataMap[key]; exists {
|
||||
result[key] = entries
|
||||
}
|
||||
selector.FieldName = selectors[i].FieldName
|
||||
key = telemetrytypes.GetEvolutionMetadataUniqueKey(selector)
|
||||
if entries, exists := m.ColumnEvolutionMetadataMap[key]; exists {
|
||||
result[key] = entries
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *MockMetadataStore) GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []telemetrytypes.MetricMetadataLookupKey) (map[telemetrytypes.MetricMetadataLookupKey]int64, error) {
|
||||
return m.LookupKeysMap, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user