mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-19 08:20:34 +01:00
Compare commits
1 Commits
infraM/v2_
...
issue_4967
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8afb78b17b |
@@ -124,7 +124,7 @@ func (b *traceQueryStatementBuilder) Build(
|
||||
-------------------------------- End of tech debt ----------------------------
|
||||
*/
|
||||
|
||||
query = b.adjustKeys(ctx, keys, query, requestType)
|
||||
adjustTraceKeys(ctx, b.logger, keys, &query, requestType)
|
||||
|
||||
// Create SQL builder
|
||||
q := sqlbuilder.NewSelectBuilder()
|
||||
@@ -193,24 +193,25 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation])
|
||||
return keySelectors
|
||||
}
|
||||
|
||||
func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] {
|
||||
|
||||
// add deprecated fields only during statement building
|
||||
// why?
|
||||
// 1. to not fail filter expression that use deprecated cols
|
||||
// 2. this could have been moved to metadata fetching itself, however, that
|
||||
// would mean, they also show up in suggestions we we don't want to do
|
||||
// 3. reason for not doing a simple append is to keep intrinsic/calculated field first so that it gets
|
||||
// priority in multi_if sql expression
|
||||
// mergeDeprecatedTraceKeys prepends deprecated intrinsic/calculated trace field
|
||||
// definitions to the keys map so that filter expressions referencing deprecated
|
||||
// columns continue to resolve. Prepending keeps the intrinsic/calculated entry
|
||||
// first so it wins in the multi_if SQL expression.
|
||||
func mergeDeprecatedTraceKeys(keys map[string][]*telemetrytypes.TelemetryFieldKey) {
|
||||
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
|
||||
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
|
||||
}
|
||||
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
|
||||
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
|
||||
}
|
||||
}
|
||||
|
||||
func adjustTraceKeys(ctx context.Context, logger *slog.Logger, keys map[string][]*telemetrytypes.TelemetryFieldKey, query *qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) {
|
||||
|
||||
mergeDeprecatedTraceKeys(keys)
|
||||
|
||||
// Adjust keys for alias expressions in aggregations
|
||||
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
|
||||
actions := querybuilder.AdjustKeysForAliasExpressions(query, requestType)
|
||||
|
||||
/*
|
||||
Check if user is using multiple contexts or data types for same field name
|
||||
@@ -228,7 +229,7 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
|
||||
and make it just http.status_code and remove the duplicate entry.
|
||||
*/
|
||||
|
||||
actions = append(actions, querybuilder.AdjustDuplicateKeys(&query)...)
|
||||
actions = append(actions, querybuilder.AdjustDuplicateKeys(query)...)
|
||||
|
||||
/*
|
||||
Now adjust each key to have correct context and data type
|
||||
@@ -236,24 +237,24 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
|
||||
Reason for doing this is to not create an unexpected behavior for users
|
||||
*/
|
||||
for idx := range query.SelectFields {
|
||||
actions = append(actions, b.adjustKey(&query.SelectFields[idx], keys)...)
|
||||
actions = append(actions, adjustTraceKey(&query.SelectFields[idx], keys)...)
|
||||
}
|
||||
for idx := range query.GroupBy {
|
||||
actions = append(actions, b.adjustKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
|
||||
actions = append(actions, adjustTraceKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
|
||||
}
|
||||
for idx := range query.Order {
|
||||
actions = append(actions, b.adjustKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
|
||||
actions = append(actions, adjustTraceKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
|
||||
}
|
||||
|
||||
for _, action := range actions {
|
||||
// TODO: change to debug level once we are confident about the behavior
|
||||
b.logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
|
||||
logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
|
||||
}
|
||||
|
||||
return query
|
||||
}
|
||||
|
||||
func (b *traceQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
|
||||
// adjustTraceKey resolves a single TelemetryFieldKey against the keys map,
|
||||
// preferring intrinsic/calculated field definitions when the name matches one.
|
||||
func adjustTraceKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
|
||||
|
||||
// for recording actions taken
|
||||
actions := []string{}
|
||||
|
||||
@@ -1125,28 +1125,13 @@ func TestAdjustKey(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
statementBuilder := NewTraceQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
aggExprRewriter,
|
||||
nil,
|
||||
fl,
|
||||
)
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
// Create a copy of the input key to avoid modifying the original
|
||||
key := c.inputKey
|
||||
|
||||
// Call adjustKey
|
||||
statementBuilder.adjustKey(&key, c.keysMap)
|
||||
adjustTraceKey(&key, c.keysMap)
|
||||
|
||||
// Verify the key was adjusted as expected
|
||||
require.Equal(t, c.expectedKey.Name, key.Name, "key name should match")
|
||||
@@ -1424,7 +1409,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
}
|
||||
|
||||
// Call adjustKeys
|
||||
c.query = statementBuilder.adjustKeys(context.Background(), keysMapCopy, c.query, qbtypes.RequestTypeScalar)
|
||||
adjustTraceKeys(context.Background(), statementBuilder.logger, keysMapCopy, &c.query, qbtypes.RequestTypeScalar)
|
||||
|
||||
// Verify select fields were adjusted
|
||||
if c.expectedSelectFields != nil {
|
||||
|
||||
@@ -197,6 +197,14 @@ func (b *traceOperatorCTEBuilder) buildQueryCTE(ctx context.Context, queryName s
|
||||
}
|
||||
b.stmtBuilder.logger.DebugContext(ctx, "Retrieved keys for query", slog.String("query_name", queryName), slog.Int("keys_count", len(keys)))
|
||||
|
||||
// RequestTypeRaw is correct here regardless of the operator's outer
|
||||
// request type: this CTE is a raw projection of spans matching the filter
|
||||
// (no aggregations, no GroupBy, no OrderBy) — aggregation/grouping happens
|
||||
// in buildFinalQuery on top of the CTE. AdjustKeysForAliasExpressions
|
||||
// (the only requestType-sensitive step inside adjustTraceKeys) is a
|
||||
// no-op for raw.
|
||||
adjustTraceKeys(ctx, b.stmtBuilder.logger, keys, query, qbtypes.RequestTypeRaw)
|
||||
|
||||
// Build resource filter CTE for this specific query
|
||||
resourceFilterCTEName := fmt.Sprintf("__resource_filter_%s", cteName)
|
||||
resourceStmt, err := b.buildResourceFilterCTE(ctx, *query)
|
||||
@@ -398,21 +406,28 @@ func (b *traceOperatorCTEBuilder) buildNotCTE(leftCTE, rightCTE string) (string,
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildFinalQuery(ctx context.Context, selectFromCTE string, requestType qbtypes.RequestType) (*qbtypes.Statement, error) {
|
||||
keySelectors := b.getKeySelectors()
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.adjustOperatorKeys(keys)
|
||||
|
||||
switch requestType {
|
||||
case qbtypes.RequestTypeRaw:
|
||||
return b.buildListQuery(ctx, selectFromCTE)
|
||||
return b.buildListQuery(ctx, selectFromCTE, keys)
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
return b.buildTimeSeriesQuery(ctx, selectFromCTE)
|
||||
return b.buildTimeSeriesQuery(ctx, selectFromCTE, keys)
|
||||
case qbtypes.RequestTypeTrace:
|
||||
return b.buildTraceQuery(ctx, selectFromCTE)
|
||||
return b.buildTraceQuery(ctx, selectFromCTE, keys)
|
||||
case qbtypes.RequestTypeScalar:
|
||||
return b.buildScalarQuery(ctx, selectFromCTE)
|
||||
return b.buildScalarQuery(ctx, selectFromCTE, keys)
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
// Select core fields
|
||||
@@ -434,22 +449,6 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
|
||||
"parent_span_id": true,
|
||||
}
|
||||
|
||||
// Get keys for selectFields
|
||||
keySelectors := b.getKeySelectors()
|
||||
for _, field := range b.operator.SelectFields {
|
||||
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
||||
Name: field.Name,
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: field.FieldContext,
|
||||
FieldDataType: field.FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Add selectFields using ColumnExpressionFor since we now have all base table columns
|
||||
for _, field := range b.operator.SelectFields {
|
||||
if selectedFields[field.Name] {
|
||||
@@ -499,6 +498,22 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
|
||||
}, nil
|
||||
}
|
||||
|
||||
// adjustOperatorKeys merges deprecated trace field definitions into keys and
|
||||
// reconciles each operator-level SelectFields/GroupBy/Order key against the
|
||||
// keys map, mirroring the per-field portion of adjustTraceKeys.
|
||||
func (b *traceOperatorCTEBuilder) adjustOperatorKeys(keys map[string][]*telemetrytypes.TelemetryFieldKey) {
|
||||
mergeDeprecatedTraceKeys(keys)
|
||||
for idx := range b.operator.SelectFields {
|
||||
adjustTraceKey(&b.operator.SelectFields[idx], keys)
|
||||
}
|
||||
for idx := range b.operator.GroupBy {
|
||||
adjustTraceKey(&b.operator.GroupBy[idx].TelemetryFieldKey, keys)
|
||||
}
|
||||
for idx := range b.operator.Order {
|
||||
adjustTraceKey(&b.operator.Order[idx].Key.TelemetryFieldKey, keys)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySelector {
|
||||
var keySelectors []*telemetrytypes.FieldKeySelector
|
||||
|
||||
@@ -526,6 +541,15 @@ func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySe
|
||||
})
|
||||
}
|
||||
|
||||
for _, sf := range b.operator.SelectFields {
|
||||
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
||||
Name: sf.Name,
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: sf.FieldContext,
|
||||
FieldDataType: sf.FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
for i := range keySelectors {
|
||||
keySelectors[i].Signal = telemetrytypes.SignalTraces
|
||||
}
|
||||
@@ -533,7 +557,7 @@ func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySe
|
||||
return keySelectors
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
sb.Select(fmt.Sprintf(
|
||||
@@ -541,12 +565,6 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
|
||||
int64(b.operator.StepInterval.Seconds()),
|
||||
))
|
||||
|
||||
keySelectors := b.getKeySelectors()
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
@@ -625,8 +643,7 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
|
||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||
|
||||
// Add HAVING clause if specified
|
||||
err = b.addHavingClause(sb)
|
||||
if err != nil {
|
||||
if err := b.addHavingClause(sb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -653,17 +670,11 @@ func (b *traceOperatorCTEBuilder) buildTraceSummaryCTE(selectFromCTE string) {
|
||||
b.addCTE("trace_summary", sql, args, []string{"all_spans", selectFromCTE})
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
|
||||
b.buildTraceSummaryCTE(selectFromCTE)
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
keySelectors := b.getKeySelectors()
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
@@ -745,8 +756,7 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
|
||||
sb.GroupBy(groupByKeys...)
|
||||
}
|
||||
|
||||
err = b.addHavingClause(sb)
|
||||
if err != nil {
|
||||
if err := b.addHavingClause(sb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -802,15 +812,9 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
keySelectors := b.getKeySelectors()
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
@@ -892,8 +896,7 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
|
||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||
|
||||
// Add HAVING clause if specified
|
||||
err = b.addHavingClause(sb)
|
||||
if err != nil {
|
||||
if err := b.addHavingClause(sb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -693,6 +693,134 @@ def test_traces_list_with_corrupt_data(
|
||||
assert data[key] == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"payload,status_code,results",
|
||||
[
|
||||
# Case 1: builder CTE filters use deprecated intrinsic field durationNano
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": True,
|
||||
"filter": {"expression": 'durationNano = "3s"'},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "B",
|
||||
"signal": "traces",
|
||||
"disabled": True,
|
||||
"filter": {"expression": 'durationNano = "5s"'},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_trace_operator",
|
||||
"spec": {
|
||||
"name": "C",
|
||||
"expression": "A => B",
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
],
|
||||
HTTPStatus.OK,
|
||||
lambda x: {
|
||||
"duration_nano": x[0].duration_nano,
|
||||
"name": x[0].name,
|
||||
"parent_span_id": x[0].parent_span_id,
|
||||
"span_id": x[0].span_id,
|
||||
"timestamp": format_timestamp(x[0].timestamp),
|
||||
"trace_id": x[0].trace_id,
|
||||
}, # type: Callable[[List[Traces]], Dict[str, Any]]
|
||||
),
|
||||
# Case 2: builder CTE filter uses deprecated calculated field responseStatusCode
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": True,
|
||||
"filter": {"expression": 'responseStatusCode = "200"'},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "B",
|
||||
"signal": "traces",
|
||||
"disabled": True,
|
||||
"filter": {"expression": 'durationNano = "5s"'},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_trace_operator",
|
||||
"spec": {
|
||||
"name": "C",
|
||||
"expression": "A => B",
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
],
|
||||
HTTPStatus.OK,
|
||||
lambda x: {
|
||||
"duration_nano": x[0].duration_nano,
|
||||
"name": x[0].name,
|
||||
"parent_span_id": x[0].parent_span_id,
|
||||
"span_id": x[0].span_id,
|
||||
"timestamp": format_timestamp(x[0].timestamp),
|
||||
"trace_id": x[0].trace_id,
|
||||
}, # type: Callable[[List[Traces]], Dict[str, Any]]
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_traces_operator_cte_with_adjusted_keys(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
payload: list[dict[str, Any]],
|
||||
status_code: HTTPStatus,
|
||||
results: Callable[[list[Traces]], dict[str, Any]],
|
||||
) -> None:
|
||||
"""
|
||||
Trace operators compile each referenced disabled builder query into a CTE.
|
||||
Those CTE filters must adjust deprecated trace keys before preparing the
|
||||
where clause, otherwise these payloads fail with "key not found".
|
||||
"""
|
||||
traces = generate_traces_with_corrupt_metadata()
|
||||
insert_traces(traces)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=5)).timestamp() * 1000),
|
||||
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
|
||||
request_type="raw",
|
||||
queries=payload,
|
||||
)
|
||||
|
||||
assert response.status_code == status_code, response.text
|
||||
|
||||
if response.status_code == HTTPStatus.OK:
|
||||
operator_result = find_named_result(response.json()["data"]["data"]["results"], "C")
|
||||
assert operator_result is not None
|
||||
rows = operator_result["rows"]
|
||||
if not results(traces):
|
||||
assert rows is None
|
||||
else:
|
||||
assert rows is not None
|
||||
data = rows[0]["data"]
|
||||
for key, value in results(traces).items():
|
||||
assert data[key] == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_by,aggregation_alias,expected_status",
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user