mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-08 19:40:30 +01:00
Compare commits
1 Commits
infraM/v2_
...
issue_4522
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51fcc22d8a |
@@ -186,7 +186,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
column := columns[0]
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, _, err := selectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
|
||||
newColumns, _, err := qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -3,11 +3,7 @@ package telemetrylogs
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/utils"
|
||||
@@ -137,113 +133,6 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
// selectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
|
||||
// Logic:
|
||||
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// - Rejects all evolutions before this latest base evolution
|
||||
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
|
||||
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
|
||||
// - Results are sorted by ReleaseTime descending (newest first)
|
||||
func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
|
||||
|
||||
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
|
||||
copy(sortedEvolutions, evolutions)
|
||||
|
||||
// sort the evolutions by ReleaseTime ascending
|
||||
sort.Slice(sortedEvolutions, func(i, j int) bool {
|
||||
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
|
||||
})
|
||||
|
||||
tsStartTime := time.Unix(0, int64(tsStart))
|
||||
tsEndTime := time.Unix(0, int64(tsEnd))
|
||||
|
||||
// Build evolution map: column name -> evolution
|
||||
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
|
||||
// since if there is duplicate we would just use the oldest one.
|
||||
continue
|
||||
}
|
||||
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
|
||||
}
|
||||
|
||||
// Find the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// Evolutions are sorted, so we can break early
|
||||
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if evolution.ReleaseTime.After(tsStartTime) {
|
||||
break
|
||||
}
|
||||
latestBaseEvolutionAcrossAll = evolution
|
||||
}
|
||||
|
||||
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
|
||||
if latestBaseEvolutionAcrossAll == nil {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
|
||||
}
|
||||
|
||||
columnLookUpMap := make(map[string]*schema.Column)
|
||||
for _, column := range columns {
|
||||
columnLookUpMap[column.Name] = column
|
||||
}
|
||||
|
||||
// Collect column-evolution pairs
|
||||
type colEvoPair struct {
|
||||
column *schema.Column
|
||||
evolution *telemetrytypes.EvolutionEntry
|
||||
}
|
||||
pairs := []colEvoPair{}
|
||||
|
||||
for _, evolution := range evolutionMap {
|
||||
// Reject evolutions before the latest base evolution
|
||||
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
|
||||
continue
|
||||
}
|
||||
// skip evolutions after tsEndTime
|
||||
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
|
||||
}
|
||||
|
||||
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
|
||||
}
|
||||
|
||||
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
|
||||
if len(pairs) == 0 {
|
||||
for _, column := range columns {
|
||||
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
|
||||
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
|
||||
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
slices.SortFunc(pairs, func(a, b colEvoPair) int {
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
if a.evolution.ReleaseTime.After(b.evolution.ReleaseTime) {
|
||||
return -1
|
||||
}
|
||||
if a.evolution.ReleaseTime.Before(b.evolution.ReleaseTime) {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
// Extract results
|
||||
newColumns := make([]*schema.Column, len(pairs))
|
||||
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
|
||||
for i, pair := range pairs {
|
||||
newColumns[i] = pair.column
|
||||
evolutionsEntries[i] = pair.evolution
|
||||
}
|
||||
|
||||
return newColumns, evolutionsEntries, nil
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
columns, err := m.getColumn(ctx, key)
|
||||
if err != nil {
|
||||
@@ -254,7 +143,7 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
|
||||
var evolutionsEntries []*telemetrytypes.EvolutionEntry
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, evolutionsEntries, err = selectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd)
|
||||
newColumns, evolutionsEntries, err = qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -886,7 +886,7 @@ func TestSelectEvolutionsForColumns(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
resultColumns, resultEvols, err := selectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd)
|
||||
resultColumns, resultEvols, err := qbtypes.SelectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd)
|
||||
|
||||
if tc.expectedError {
|
||||
assert.Contains(t, err.Error(), tc.errorStr)
|
||||
|
||||
@@ -344,6 +344,11 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := t.updateColumnEvolutionMetadataForKeys(ctx, keys); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return keys, complete, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -89,6 +89,20 @@ func TestGetKeys(t *testing.T) {
|
||||
{Name: "tag_data_type", Type: "String"},
|
||||
{Name: "priority", Type: "UInt8"},
|
||||
}, [][]any{{"http.method", "tag", "String", 1}, {"http.method", "tag", "String", 1}}))
|
||||
|
||||
// Two rows above produce two evolution selectors (each contributing 4 bound args).
|
||||
mock.ExpectQuery(`FROM signoz_metadata\.distributed_column_evolution_metadata`).
|
||||
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
|
||||
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
|
||||
{Name: "signal", Type: "String"},
|
||||
{Name: "column_name", Type: "String"},
|
||||
{Name: "column_type", Type: "String"},
|
||||
{Name: "field_context", Type: "String"},
|
||||
{Name: "field_name", Type: "String"},
|
||||
{Name: "version", Type: "UInt32"},
|
||||
{Name: "release_time", Type: "Float64"},
|
||||
}, [][]any{}))
|
||||
|
||||
keys, _, err := metadata.GetKeys(context.Background(), &telemetrytypes.FieldKeySelector{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
@@ -247,6 +261,27 @@ func TestApplyBackwardCompatibleKeys(t *testing.T) {
|
||||
}, rows))
|
||||
}
|
||||
|
||||
// getTracesKeys / getLogsKeys both fetch evolution metadata; return an empty
|
||||
// result so the existing test data flows through unchanged. Each input key
|
||||
// becomes one selector contributing four bound args.
|
||||
if hasTraces || hasLogs {
|
||||
evoArgs := make([]any, 0, len(tt.inputKeys)*4)
|
||||
for range tt.inputKeys {
|
||||
evoArgs = append(evoArgs, nil, nil, nil, nil)
|
||||
}
|
||||
mock.ExpectQuery(`FROM signoz_metadata\.distributed_column_evolution_metadata`).
|
||||
WithArgs(evoArgs...).
|
||||
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
|
||||
{Name: "signal", Type: "String"},
|
||||
{Name: "column_name", Type: "String"},
|
||||
{Name: "column_type", Type: "String"},
|
||||
{Name: "field_context", Type: "String"},
|
||||
{Name: "field_name", Type: "String"},
|
||||
{Name: "version", Type: "UInt32"},
|
||||
{Name: "release_time", Type: "Float64"},
|
||||
}, [][]any{}))
|
||||
}
|
||||
|
||||
selectors := []*telemetrytypes.FieldKeySelector{}
|
||||
for _, key := range tt.inputKeys {
|
||||
selectors = append(selectors, &telemetrytypes.FieldKeySelector{
|
||||
|
||||
@@ -161,7 +161,41 @@ func (c *conditionBuilder) conditionFor(
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
|
||||
var value any
|
||||
switch columns[0].Type.GetType() {
|
||||
column := columns[0]
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, _, err := qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(newColumns) == 0 {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "no valid evolution found for field %s in the given time range", key.Name)
|
||||
}
|
||||
|
||||
// Multiple columns means fieldExpression is a multiIf returning NULL when none match,
|
||||
// so a simple null check is sufficient.
|
||||
if len(newColumns) > 1 {
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(fieldExpression), nil
|
||||
} else {
|
||||
return sb.IsNull(fieldExpression), nil
|
||||
}
|
||||
}
|
||||
|
||||
// otherwise we have to find the correct exist operator based on the column type
|
||||
column = newColumns[0]
|
||||
} else if len(columns) > 1 {
|
||||
// Resource fields without evolution data still produce a multiIf in FieldFor;
|
||||
// fall back to a null check on the multiIf result.
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(fieldExpression), nil
|
||||
} else {
|
||||
return sb.IsNull(fieldExpression), nil
|
||||
}
|
||||
}
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(fieldExpression), nil
|
||||
@@ -178,7 +212,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
return sb.E(fieldExpression, value), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := columns[0].Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
@@ -202,14 +236,14 @@ func (c *conditionBuilder) conditionFor(
|
||||
return sb.E(fieldExpression, value), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := columns[0].Type.(schema.MapColumnType).KeyType
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, columns[0].Type)
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
|
||||
switch valueType := columns[0].Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", columns[0].Name, key.Name)
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
}
|
||||
@@ -222,7 +256,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", columns[0].Type)
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetrytraces
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -216,7 +217,7 @@ func TestConditionFor(t *testing.T) {
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
|
||||
expectedSQL: "WHERE multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) IS NOT NULL",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -228,7 +229,7 @@ func TestConditionFor(t *testing.T) {
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NULL",
|
||||
expectedSQL: "WHERE multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) IS NULL",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -302,3 +303,85 @@ func TestConditionFor(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestConditionForResourceWithEvolution(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
releaseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
evolutions := mockEvolutionData(releaseTime)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
operator qbtypes.FilterOperator
|
||||
tsStart uint64
|
||||
tsEnd uint64
|
||||
expectedSQL string
|
||||
}{
|
||||
{
|
||||
name: "Exists - window after release - JSON only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedSQL: "WHERE resource.`service.name`::String IS NOT NULL",
|
||||
},
|
||||
{
|
||||
name: "NotExists - window after release - JSON only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotExists,
|
||||
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedSQL: "WHERE resource.`service.name`::String IS NULL",
|
||||
},
|
||||
{
|
||||
name: "Exists - window before release - map only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
tsStart: uint64(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedSQL: "WHERE mapContains(resources_string, 'service.name') = ?",
|
||||
},
|
||||
{
|
||||
name: "Exists - window straddles release - multiIf null check",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
conditionBuilder := NewConditionBuilder(fm)
|
||||
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, tc.tsStart, tc.tsEnd, &tc.key, tc.operator, nil, sb)
|
||||
require.NoError(t, err)
|
||||
sb.Where(cond)
|
||||
sql, _ := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
assert.Contains(t, sql, tc.expectedSQL)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ func (m *defaultFieldMapper) getColumn(
|
||||
) ([]*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
return []*schema.Column{indexV3Columns["resource"]}, nil
|
||||
return []*schema.Column{indexV3Columns["resources_string"], indexV3Columns["resource"]}, nil
|
||||
case telemetrytypes.FieldContextScope:
|
||||
return []*schema.Column{}, qbtypes.ErrColumnNotFound
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
@@ -254,63 +254,92 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(columns) != 1 {
|
||||
return "", errors.Newf(errors.TypeInternal, errors.CodeInternal, "expected exactly 1 column, got %d", len(columns))
|
||||
|
||||
var newColumns []*schema.Column
|
||||
var evolutionsEntries []*telemetrytypes.EvolutionEntry
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, evolutionsEntries, err = qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
newColumns = columns
|
||||
}
|
||||
column := columns[0]
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
oldColumn := indexV3Columns["resources_string"]
|
||||
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
|
||||
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
|
||||
// once clickHouse dependency is updated, we need to check if we can remove it.
|
||||
if key.Materialized {
|
||||
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
|
||||
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
|
||||
} else {
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
schema.ColumnTypeEnumInt8,
|
||||
schema.ColumnTypeEnumInt16,
|
||||
schema.ColumnTypeEnumBool,
|
||||
schema.ColumnTypeEnumDateTime64,
|
||||
schema.ColumnTypeEnumFixedString:
|
||||
return column.Name, nil
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
return column.Name, nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
|
||||
}
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
exprs := []string{}
|
||||
existExpr := []string{}
|
||||
for i, column := range newColumns {
|
||||
// Use evolution column name if available, otherwise use the column name
|
||||
columnName := column.Name
|
||||
if evolutionsEntries != nil && evolutionsEntries[i] != nil {
|
||||
columnName = evolutionsEntries[i].ColumnName
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
|
||||
// once clickHouse dependency is updated, we need to check if we can remove it.
|
||||
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
schema.ColumnTypeEnumInt8,
|
||||
schema.ColumnTypeEnumInt16,
|
||||
schema.ColumnTypeEnumBool,
|
||||
schema.ColumnTypeEnumDateTime64,
|
||||
schema.ColumnTypeEnumFixedString:
|
||||
exprs = append(exprs, column.Name)
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
exprs = append(exprs, column.Name)
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
|
||||
}
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
exprs = append(exprs, telemetrytypes.FieldKeyToMaterializedColumnName(key))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s==true", telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)))
|
||||
} else {
|
||||
exprs = append(exprs, fmt.Sprintf("%s['%s']", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("mapContains(%s, '%s')", columnName, key.Name))
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
|
||||
}
|
||||
}
|
||||
|
||||
if len(exprs) == 1 {
|
||||
return exprs[0], nil
|
||||
} else if len(exprs) > 1 {
|
||||
// Ensure existExpr has the same length as exprs
|
||||
if len(existExpr) != len(exprs) {
|
||||
return "", errors.New(errors.TypeInternal, errors.CodeInternal, "length of exist exprs doesn't match to that of exprs")
|
||||
}
|
||||
finalExprs := []string{}
|
||||
for i, expr := range exprs {
|
||||
finalExprs = append(finalExprs, fmt.Sprintf("%s, %s", existExpr[i], expr))
|
||||
}
|
||||
return "multiIf(" + strings.Join(finalExprs, ", ") + ", NULL)", nil
|
||||
}
|
||||
|
||||
// should not reach here
|
||||
return column.Name, nil
|
||||
return columns[0].Name, nil
|
||||
}
|
||||
|
||||
// ColumnExpressionFor returns the column expression for the given field
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetrytraces
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -64,7 +65,7 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
},
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedResult: "multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -75,7 +76,7 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
},
|
||||
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
|
||||
expectedResult: "multiIf(`resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -103,3 +104,86 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldForResourceWithEvolution(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
releaseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
evolutions := mockEvolutionData(releaseTime)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
tsStart uint64
|
||||
tsEnd uint64
|
||||
expectedResult string
|
||||
}{
|
||||
{
|
||||
name: "Window straddles release - both columns",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
},
|
||||
{
|
||||
name: "Window fully after release - JSON column only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "resource.`service.name`::String",
|
||||
},
|
||||
{
|
||||
name: "Window fully before release - map column only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "resources_string['service.name']",
|
||||
},
|
||||
{
|
||||
name: "Window fully after release - materialized resource",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "deployment.environment",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "resource.`deployment.environment`::String",
|
||||
},
|
||||
{
|
||||
name: "Window straddles release - materialized resource",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "deployment.environment",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,9 @@ import (
|
||||
)
|
||||
|
||||
func TestStatementBuilder(t *testing.T) {
|
||||
// releaseTime is chosen so it lands inside the standard [1747947419000, 1747983448000]ms
|
||||
// test window, keeping the multiIf SQL form for resource fields.
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -355,7 +358,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
@@ -394,6 +397,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatementBuilderListQuery(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -650,7 +654,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
@@ -683,6 +687,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -711,7 +716,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -744,7 +749,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
}},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -758,7 +763,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = c.keysMap
|
||||
if mockMetadataStore.KeysMap == nil {
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
}
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
@@ -789,6 +794,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -911,7 +917,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
@@ -944,6 +950,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAdjustKey(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
inputKey telemetrytypes.TelemetryFieldKey
|
||||
@@ -957,7 +964,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: IntrinsicFields["trace_id"],
|
||||
},
|
||||
{
|
||||
@@ -967,7 +974,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextBody, // incorrect context
|
||||
FieldDataType: telemetrytypes.FieldDataTypeInt64,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "duration_nano",
|
||||
FieldContext: telemetrytypes.FieldContextSpan, // should be corrected
|
||||
@@ -981,7 +988,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextSpan, // correct context
|
||||
FieldDataType: telemetrytypes.FieldDataTypeInt64,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "duration_nano",
|
||||
FieldContext: telemetrytypes.FieldContextSpan, // should be corrected
|
||||
@@ -995,8 +1002,8 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: *buildCompleteFieldKeyMap()["service.name"][0],
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: *buildCompleteFieldKeyMap(releaseTime)["service.name"][0],
|
||||
},
|
||||
{
|
||||
name: "single matching key with context specified - override",
|
||||
@@ -1005,8 +1012,8 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: *buildCompleteFieldKeyMap()["cart.items_count"][0],
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: *buildCompleteFieldKeyMap(releaseTime)["cart.items_count"][0],
|
||||
},
|
||||
{
|
||||
name: "multiple matching keys - all materialized",
|
||||
@@ -1043,7 +1050,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "mixed.materialization.key",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
@@ -1057,7 +1064,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "mixed.materialization.key",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
@@ -1072,7 +1079,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "unknown.field",
|
||||
Materialized: false,
|
||||
@@ -1085,7 +1092,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
@@ -1100,7 +1107,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "cart.items_count",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
@@ -1115,7 +1122,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
@@ -1158,6 +1165,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAdjustKeys(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
|
||||
@@ -1183,7 +1191,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedSelectFields: []telemetrytypes.TelemetryFieldKey{
|
||||
{
|
||||
Name: "service.name",
|
||||
@@ -1220,7 +1228,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedGroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
@@ -1267,7 +1275,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
@@ -1326,7 +1334,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedSelectFields: []telemetrytypes.TelemetryFieldKey{
|
||||
{
|
||||
Name: "trace_id",
|
||||
@@ -1381,7 +1389,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
// After alias adjustment, name becomes "span.duration" with FieldContextUnspecified
|
||||
// "span.duration" is not in keysMap, so context stays unspecified
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package telemetrytraces
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
func buildCompleteFieldKeyMap(releaseTime time.Time) map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"service.name": {
|
||||
{
|
||||
@@ -115,7 +117,33 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
for _, keys := range keysMap {
|
||||
for _, key := range keys {
|
||||
key.Signal = telemetrytypes.SignalTraces
|
||||
if key.FieldContext == telemetrytypes.FieldContextResource {
|
||||
key.Evolutions = mockEvolutionData(releaseTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
return keysMap
|
||||
}
|
||||
|
||||
// mockEvolutionData returns the canonical resource-column evolution timeline used in tests:
|
||||
// the legacy resources_string map at epoch 0 and the JSON resource column released at releaseTime.
|
||||
func mockEvolutionData(releaseTime time.Time) []*telemetrytypes.EvolutionEntry {
|
||||
return []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
ColumnName: "resources_string",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: releaseTime,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
)
|
||||
|
||||
func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -67,7 +68,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name` FROM A_DIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name` FROM A_DIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -390,7 +391,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
@@ -443,6 +444,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
operator qbtypes.QueryBuilderTraceOperator
|
||||
@@ -506,7 +508,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
@@ -16,12 +17,13 @@ import (
|
||||
)
|
||||
|
||||
func TestTraceTimeRangeOptimization(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
mockMetadataStore.KeysMap["trace_id"] = []*telemetrytypes.TelemetryFieldKey{{
|
||||
Name: "trace_id",
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
|
||||
119
pkg/types/querybuildertypes/querybuildertypesv5/evolution.go
Normal file
119
pkg/types/querybuildertypes/querybuildertypesv5/evolution.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
// SelectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
|
||||
// Logic:
|
||||
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// - Rejects all evolutions before this latest base evolution
|
||||
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
|
||||
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
|
||||
// - Results are sorted by ReleaseTime descending (newest first)
|
||||
func SelectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
|
||||
|
||||
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
|
||||
copy(sortedEvolutions, evolutions)
|
||||
|
||||
// sort the evolutions by ReleaseTime ascending
|
||||
sort.Slice(sortedEvolutions, func(i, j int) bool {
|
||||
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
|
||||
})
|
||||
|
||||
tsStartTime := time.Unix(0, int64(tsStart))
|
||||
tsEndTime := time.Unix(0, int64(tsEnd))
|
||||
|
||||
// Build evolution map: column name -> evolution
|
||||
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
|
||||
// since if there is duplicate we would just use the oldest one.
|
||||
continue
|
||||
}
|
||||
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
|
||||
}
|
||||
|
||||
// Find the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// Evolutions are sorted, so we can break early
|
||||
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if evolution.ReleaseTime.After(tsStartTime) {
|
||||
break
|
||||
}
|
||||
latestBaseEvolutionAcrossAll = evolution
|
||||
}
|
||||
|
||||
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
|
||||
if latestBaseEvolutionAcrossAll == nil {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
|
||||
}
|
||||
|
||||
columnLookUpMap := make(map[string]*schema.Column)
|
||||
for _, column := range columns {
|
||||
columnLookUpMap[column.Name] = column
|
||||
}
|
||||
|
||||
// Collect column-evolution pairs
|
||||
type colEvoPair struct {
|
||||
column *schema.Column
|
||||
evolution *telemetrytypes.EvolutionEntry
|
||||
}
|
||||
pairs := []colEvoPair{}
|
||||
|
||||
for _, evolution := range evolutionMap {
|
||||
// Reject evolutions before the latest base evolution
|
||||
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
|
||||
continue
|
||||
}
|
||||
// skip evolutions after tsEndTime
|
||||
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
|
||||
}
|
||||
|
||||
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
|
||||
}
|
||||
|
||||
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
|
||||
if len(pairs) == 0 {
|
||||
for _, column := range columns {
|
||||
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
|
||||
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
|
||||
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
slices.SortFunc(pairs, func(a, b colEvoPair) int {
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
if a.evolution.ReleaseTime.After(b.evolution.ReleaseTime) {
|
||||
return -1
|
||||
}
|
||||
if a.evolution.ReleaseTime.Before(b.evolution.ReleaseTime) {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
// Extract results
|
||||
newColumns := make([]*schema.Column, len(pairs))
|
||||
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
|
||||
for i, pair := range pairs {
|
||||
newColumns[i] = pair.column
|
||||
evolutionsEntries[i] = pair.evolution
|
||||
}
|
||||
|
||||
return newColumns, evolutionsEntries, nil
|
||||
}
|
||||
11
tests/fixtures/traces.py
vendored
11
tests/fixtures/traces.py
vendored
@@ -6,7 +6,7 @@ import uuid
|
||||
from abc import ABC
|
||||
from collections.abc import Callable, Generator
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
from typing import Any, Literal
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import numpy as np
|
||||
@@ -236,6 +236,7 @@ class Traces(ABC):
|
||||
attributes_number: dict[str, np.float64]
|
||||
attributes_bool: dict[str, bool]
|
||||
resources_string: dict[str, str]
|
||||
resource_json: dict[str, str]
|
||||
events: list[str]
|
||||
links: str
|
||||
response_status_code: str
|
||||
@@ -273,6 +274,7 @@ class Traces(ABC):
|
||||
links: list[TracesLink] = [],
|
||||
trace_state: str = "",
|
||||
flags: np.uint32 = 0,
|
||||
resource_write_mode: Literal["legacy_only", "dual_write"] = "dual_write",
|
||||
) -> None:
|
||||
if timestamp is None:
|
||||
timestamp = datetime.datetime.now()
|
||||
@@ -322,8 +324,11 @@ class Traces(ABC):
|
||||
self.db_name = ""
|
||||
self.db_operation = ""
|
||||
|
||||
# Process resources and derive service_name
|
||||
# Process resources and derive service_name. Spans written before the
|
||||
# JSON-resource evolution time only populate resources_string (legacy_only);
|
||||
# spans at or after the evolution time dual-write to both columns.
|
||||
self.resources_string = {k: str(v) for k, v in resources.items()}
|
||||
self.resource_json = {} if resource_write_mode == "legacy_only" else dict(self.resources_string)
|
||||
self.service_name = self.resources_string.get("service.name", "default-service")
|
||||
|
||||
for k, v in self.resources_string.items():
|
||||
@@ -575,7 +580,7 @@ class Traces(ABC):
|
||||
self.db_operation,
|
||||
self.has_error,
|
||||
self.is_remote,
|
||||
self.resources_string,
|
||||
self.resource_json,
|
||||
],
|
||||
dtype=object,
|
||||
)
|
||||
|
||||
240
tests/integration/tests/querier/13_traces_resource_evolution.py
Normal file
240
tests/integration/tests/querier/13_traces_resource_evolution.py
Normal file
@@ -0,0 +1,240 @@
|
||||
from collections.abc import Callable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from http import HTTPStatus
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.querier import (
|
||||
build_group_by_field,
|
||||
build_logs_aggregation,
|
||||
index_series_by_label,
|
||||
make_query_request,
|
||||
)
|
||||
from fixtures.traces import TraceIdGenerator, Traces
|
||||
|
||||
|
||||
# we already create the evolution for resource during schema migration
|
||||
# since we have to create test data around it, we need to get the evolution time
|
||||
def _get_traces_resource_evolution_time_json(signoz: types.SigNoz) -> datetime:
|
||||
result = signoz.telemetrystore.conn.query(
|
||||
"""
|
||||
SELECT release_time
|
||||
FROM signoz_metadata.distributed_column_evolution_metadata
|
||||
WHERE signal = 'traces'
|
||||
AND field_context = 'resource'
|
||||
AND field_name = '__all__'
|
||||
AND column_name = 'resource'
|
||||
LIMIT 1
|
||||
"""
|
||||
).result_rows
|
||||
|
||||
assert result, "Expected traces resource evolution metadata to exist"
|
||||
|
||||
release_time_ns = int(result[0][0])
|
||||
return datetime.fromtimestamp(release_time_ns / 1e9, tz=UTC)
|
||||
|
||||
|
||||
# Spans with timestamps before the evolution time will have resources written only to resources_string.
|
||||
# Spans with timestamps at or after the evolution time will have resources written to both resources_string and resource (JSON).
|
||||
def _build_evolved_span(
|
||||
timestamp: datetime,
|
||||
evolution_time: datetime,
|
||||
service_name: str,
|
||||
name: str,
|
||||
) -> Traces:
|
||||
resource_write_mode = "legacy_only" if timestamp < evolution_time else "dual_write"
|
||||
return Traces(
|
||||
timestamp=timestamp,
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
name=name,
|
||||
resources={
|
||||
"service.name": service_name,
|
||||
"deployment.environment": "integration",
|
||||
},
|
||||
resource_write_mode=resource_write_mode,
|
||||
)
|
||||
|
||||
|
||||
def _query_grouped_trace_series(
|
||||
signoz: types.SigNoz,
|
||||
token: str,
|
||||
start: datetime,
|
||||
end: datetime,
|
||||
group_by: str = "service.name",
|
||||
aggregation: str = "count()",
|
||||
) -> dict[str, list[dict]]:
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int(start.timestamp() * 1000),
|
||||
end_ms=int(end.timestamp() * 1000),
|
||||
request_type="time_series",
|
||||
queries=[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"stepInterval": 60,
|
||||
"disabled": False,
|
||||
"groupBy": [build_group_by_field(group_by)],
|
||||
"having": {"expression": ""},
|
||||
"aggregations": [build_logs_aggregation(aggregation)],
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
|
||||
results = response.json()["data"]["data"]["results"]
|
||||
assert len(results) == 1
|
||||
|
||||
aggregations = results[0]["aggregations"]
|
||||
assert len(aggregations) == 1
|
||||
|
||||
return index_series_by_label(aggregations[0]["series"], group_by)
|
||||
|
||||
|
||||
def _assert_grouped_series(
|
||||
series_by_group: dict[str, dict],
|
||||
expected_values_by_group: dict[str, dict[int, int]],
|
||||
) -> None:
|
||||
assert set(series_by_group.keys()) == set(expected_values_by_group.keys())
|
||||
|
||||
for group_name, expected_by_ts in expected_values_by_group.items():
|
||||
actual_values = sorted(
|
||||
series_by_group[group_name]["values"],
|
||||
key=lambda value: value["timestamp"],
|
||||
)
|
||||
expected_values = [{"timestamp": timestamp, "value": value} for timestamp, value in sorted(expected_by_ts.items())]
|
||||
assert actual_values == expected_values
|
||||
|
||||
|
||||
def _test_traces_resource_evolution(
|
||||
signoz: types.SigNoz,
|
||||
token: str,
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
# 1. Get the evolution time.
|
||||
# 2. Ingest spans before the evolution time.
|
||||
# 3. Ingest spans after the evolution time.
|
||||
# 4. Query the spans before the evolution time.
|
||||
# 5. Query the spans after the evolution time.
|
||||
# Both aggregation and group by should be checked.
|
||||
"""
|
||||
evolution_time = _get_traces_resource_evolution_time_json(signoz)
|
||||
evolution_time = evolution_time.replace(second=0, microsecond=0)
|
||||
|
||||
before_2 = evolution_time - timedelta(minutes=10)
|
||||
before_1 = evolution_time - timedelta(minutes=5)
|
||||
after_1 = evolution_time + timedelta(minutes=5)
|
||||
after_2 = evolution_time + timedelta(minutes=10)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
_build_evolved_span(
|
||||
timestamp=before_2,
|
||||
evolution_time=evolution_time,
|
||||
service_name="svc-before-2",
|
||||
name="span before evolution 2",
|
||||
),
|
||||
_build_evolved_span(
|
||||
timestamp=before_1,
|
||||
evolution_time=evolution_time,
|
||||
service_name="svc-before-1",
|
||||
name="span before evolution 1",
|
||||
),
|
||||
_build_evolved_span(
|
||||
timestamp=after_1,
|
||||
evolution_time=evolution_time,
|
||||
service_name="svc-after-1",
|
||||
name="span after evolution 1",
|
||||
),
|
||||
_build_evolved_span(
|
||||
timestamp=after_2,
|
||||
evolution_time=evolution_time,
|
||||
service_name="svc-after-2",
|
||||
name="span after evolution 2",
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
before_series = _query_grouped_trace_series(signoz, token, before_2 - timedelta(minutes=1), before_1 + timedelta(minutes=1))
|
||||
_assert_grouped_series(
|
||||
before_series,
|
||||
expected_values_by_group={
|
||||
"svc-before-2": {
|
||||
int(before_2.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-before-1": {
|
||||
int(before_1.timestamp() * 1000): 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
after_series = _query_grouped_trace_series(signoz, token, after_1 - timedelta(minutes=1), after_2 + timedelta(minutes=1))
|
||||
_assert_grouped_series(
|
||||
after_series,
|
||||
expected_values_by_group={
|
||||
"svc-after-1": {
|
||||
int(after_1.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-after-2": {
|
||||
int(after_2.timestamp() * 1000): 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
spanning_series = _query_grouped_trace_series(signoz, token, before_2, after_2 + timedelta(minutes=1))
|
||||
_assert_grouped_series(
|
||||
spanning_series,
|
||||
expected_values_by_group={
|
||||
"svc-before-2": {
|
||||
int(before_2.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-before-1": {
|
||||
int(before_1.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-after-1": {
|
||||
int(after_1.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-after-2": {
|
||||
int(after_2.timestamp() * 1000): 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
# query to check aggregation on the resource field like count_distinct(service.name)
|
||||
aggregation_series = _query_grouped_trace_series(
|
||||
signoz,
|
||||
token,
|
||||
before_2,
|
||||
after_2 + timedelta(minutes=1),
|
||||
group_by="deployment.environment",
|
||||
aggregation="count_distinct(service.name)",
|
||||
)
|
||||
_assert_grouped_series(
|
||||
aggregation_series,
|
||||
expected_values_by_group={
|
||||
"integration": {
|
||||
int(before_2.timestamp() * 1000): 1,
|
||||
int(before_1.timestamp() * 1000): 1,
|
||||
int(after_1.timestamp() * 1000): 1,
|
||||
int(after_2.timestamp() * 1000): 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def test_traces_resource_evolution(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
) -> None:
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
_test_traces_resource_evolution(signoz, token, insert_traces)
|
||||
Reference in New Issue
Block a user