mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-19 16:30:31 +01:00
Compare commits
4 Commits
chore/quer
...
issue_4967
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6603fe358f | ||
|
|
a6f97dbd89 | ||
|
|
bc2492ccec | ||
|
|
8afb78b17b |
@@ -66,7 +66,7 @@ const MetricsAggregateSection = memo(function MetricsAggregateSection({
|
||||
|
||||
const handleChangeGroupByKeys = useCallback(
|
||||
(value: IBuilderQuery['groupBy']) => {
|
||||
handleChangeQueryData('groupBy', value, { runAfterUpdate: true });
|
||||
handleChangeQueryData('groupBy', value);
|
||||
},
|
||||
[handleChangeQueryData],
|
||||
);
|
||||
|
||||
@@ -283,14 +283,14 @@ function QueryAddOns({
|
||||
|
||||
const handleChangeGroupByKeys = useCallback(
|
||||
(value: IBuilderQuery['groupBy']) => {
|
||||
handleChangeQueryData('groupBy', value, { runAfterUpdate: true });
|
||||
handleChangeQueryData('groupBy', value);
|
||||
},
|
||||
[handleChangeQueryData],
|
||||
);
|
||||
|
||||
const handleChangeOrderByKeys = useCallback(
|
||||
(value: IBuilderQuery['orderBy']) => {
|
||||
handleChangeQueryData('orderBy', value, { runAfterUpdate: true });
|
||||
handleChangeQueryData('orderBy', value);
|
||||
},
|
||||
[handleChangeQueryData],
|
||||
);
|
||||
|
||||
@@ -73,8 +73,8 @@ export const QueryV2 = forwardRef(function QueryV2(
|
||||
});
|
||||
|
||||
const handleToggleDisableQuery = useCallback(() => {
|
||||
handleChangeQueryData('disabled', !query.disabled, { runAfterUpdate: true });
|
||||
}, [handleChangeQueryData, query.disabled]);
|
||||
handleChangeQueryData('disabled', !query.disabled);
|
||||
}, [handleChangeQueryData, query]);
|
||||
|
||||
const handleToggleCollapsQuery = (): void => {
|
||||
setIsCollapsed(!isCollapsed);
|
||||
|
||||
@@ -5,8 +5,7 @@ import {
|
||||
BaseAutocompleteData,
|
||||
DataTypes,
|
||||
} from 'types/api/queryBuilder/queryAutocompleteResponse';
|
||||
import { IBuilderQuery, Query } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import { EQueryType } from 'types/common/dashboard';
|
||||
import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import {
|
||||
DataSource,
|
||||
MetricAggregateOperator,
|
||||
@@ -334,196 +333,3 @@ describe('useQueryBuilderOperations - Empty Aggregate Attribute Type', () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('useQueryBuilderOperations - handleChangeQueryData runAfterUpdate', () => {
|
||||
const mockHandleSetQueryData = jest.fn();
|
||||
const mockHandleSetTraceOperatorData = jest.fn();
|
||||
const mockHandleRunQuery = jest.fn();
|
||||
|
||||
const baseQuery: IBuilderQuery = {
|
||||
dataSource: DataSource.METRICS,
|
||||
aggregateOperator: MetricAggregateOperator.AVG,
|
||||
aggregateAttribute: {
|
||||
key: 'system.cpu.load',
|
||||
dataType: DataTypes.Float64,
|
||||
type: ATTRIBUTE_TYPES.GAUGE,
|
||||
} as BaseAutocompleteData,
|
||||
timeAggregation: MetricAggregateOperator.AVG,
|
||||
spaceAggregation: '',
|
||||
aggregations: [],
|
||||
having: [],
|
||||
limit: null,
|
||||
queryName: 'A',
|
||||
functions: [],
|
||||
filters: { items: [], op: 'AND' },
|
||||
groupBy: [],
|
||||
orderBy: [],
|
||||
stepInterval: 60,
|
||||
expression: '',
|
||||
disabled: false,
|
||||
reduceTo: ReduceOperators.AVG,
|
||||
legend: '',
|
||||
};
|
||||
|
||||
const otherQuery: IBuilderQuery = { ...baseQuery, queryName: 'B' };
|
||||
|
||||
const buildCurrentQuery = (): Query => ({
|
||||
queryType: EQueryType.QUERY_BUILDER,
|
||||
promql: [],
|
||||
clickhouse_sql: [],
|
||||
id: 'q-1',
|
||||
unit: '',
|
||||
builder: {
|
||||
queryData: [baseQuery, otherQuery],
|
||||
queryFormulas: [],
|
||||
queryTraceOperator: [],
|
||||
},
|
||||
});
|
||||
|
||||
const setupMock = (overrides: Record<string, unknown> = {}): void => {
|
||||
(useQueryBuilder as jest.Mock).mockReturnValue({
|
||||
handleSetQueryData: mockHandleSetQueryData,
|
||||
handleSetTraceOperatorData: mockHandleSetTraceOperatorData,
|
||||
handleSetFormulaData: jest.fn(),
|
||||
removeQueryBuilderEntityByIndex: jest.fn(),
|
||||
setLastUsedQuery: jest.fn(),
|
||||
redirectWithQueryBuilderData: jest.fn(),
|
||||
handleRunQuery: mockHandleRunQuery,
|
||||
panelType: 'time_series',
|
||||
currentQuery: buildCurrentQuery(),
|
||||
...overrides,
|
||||
});
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
setupMock();
|
||||
});
|
||||
|
||||
it('does not call handleRunQuery when options is omitted', () => {
|
||||
const { result } = renderHook(() =>
|
||||
useQueryOperations({
|
||||
query: baseQuery,
|
||||
index: 0,
|
||||
entityVersion: ENTITY_VERSION_V4,
|
||||
}),
|
||||
);
|
||||
|
||||
act(() => {
|
||||
result.current.handleChangeQueryData('legend', 'cpu-load');
|
||||
});
|
||||
|
||||
expect(mockHandleSetQueryData).toHaveBeenCalledWith(
|
||||
0,
|
||||
expect.objectContaining({ legend: 'cpu-load' }),
|
||||
);
|
||||
expect(mockHandleRunQuery).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('calls handleRunQuery with the freshly-changed query when runAfterUpdate is true', () => {
|
||||
const { result } = renderHook(() =>
|
||||
useQueryOperations({
|
||||
query: baseQuery,
|
||||
index: 0,
|
||||
entityVersion: ENTITY_VERSION_V4,
|
||||
}),
|
||||
);
|
||||
|
||||
act(() => {
|
||||
result.current.handleChangeQueryData('disabled', true, {
|
||||
runAfterUpdate: true,
|
||||
});
|
||||
});
|
||||
|
||||
expect(mockHandleSetQueryData).toHaveBeenCalledWith(
|
||||
0,
|
||||
expect.objectContaining({ disabled: true }),
|
||||
);
|
||||
expect(mockHandleRunQuery).toHaveBeenCalledTimes(1);
|
||||
const [override] = mockHandleRunQuery.mock.calls[0];
|
||||
// Index 0 reflects the new value...
|
||||
expect(override.builder.queryData[0]).toStrictEqual(
|
||||
expect.objectContaining({ queryName: 'A', disabled: true }),
|
||||
);
|
||||
// ...siblings stay untouched.
|
||||
expect(override.builder.queryData[1]).toStrictEqual(
|
||||
expect.objectContaining({ queryName: 'B', disabled: false }),
|
||||
);
|
||||
});
|
||||
|
||||
it('applies the change at the correct index without disturbing other queries', () => {
|
||||
const { result } = renderHook(() =>
|
||||
useQueryOperations({
|
||||
query: otherQuery,
|
||||
index: 1,
|
||||
entityVersion: ENTITY_VERSION_V4,
|
||||
}),
|
||||
);
|
||||
|
||||
act(() => {
|
||||
result.current.handleChangeQueryData(
|
||||
'groupBy',
|
||||
[
|
||||
{
|
||||
key: 'host.name',
|
||||
type: 'tag',
|
||||
dataType: DataTypes.String,
|
||||
} as BaseAutocompleteData,
|
||||
],
|
||||
{ runAfterUpdate: true },
|
||||
);
|
||||
});
|
||||
|
||||
const [override] = mockHandleRunQuery.mock.calls[0];
|
||||
expect(override.builder.queryData[0]).toStrictEqual(
|
||||
expect.objectContaining({ queryName: 'A', groupBy: [] }),
|
||||
);
|
||||
expect(override.builder.queryData[1]).toStrictEqual(
|
||||
expect.objectContaining({
|
||||
queryName: 'B',
|
||||
groupBy: [expect.objectContaining({ key: 'host.name' })],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('keeps handleSetQueryData and handleRunQuery in sync for legend formatting', () => {
|
||||
const { result } = renderHook(() =>
|
||||
useQueryOperations({
|
||||
query: baseQuery,
|
||||
index: 0,
|
||||
entityVersion: ENTITY_VERSION_V4,
|
||||
}),
|
||||
);
|
||||
|
||||
act(() => {
|
||||
result.current.handleChangeQueryData('legend', '{{service.name}}', {
|
||||
runAfterUpdate: true,
|
||||
});
|
||||
});
|
||||
|
||||
const [override] = mockHandleRunQuery.mock.calls[0];
|
||||
const setCallLegend = mockHandleSetQueryData.mock.calls[0][1].legend;
|
||||
expect(override.builder.queryData[0].legend).toBe(setCallLegend);
|
||||
});
|
||||
|
||||
it('does not call handleRunQuery for trace-operator queries (early return)', () => {
|
||||
const { result } = renderHook(() =>
|
||||
useQueryOperations({
|
||||
query: baseQuery,
|
||||
index: 0,
|
||||
entityVersion: ENTITY_VERSION_V4,
|
||||
isForTraceOperator: true,
|
||||
}),
|
||||
);
|
||||
|
||||
act(() => {
|
||||
result.current.handleChangeQueryData('disabled', true, {
|
||||
runAfterUpdate: true,
|
||||
});
|
||||
});
|
||||
|
||||
expect(mockHandleSetTraceOperatorData).toHaveBeenCalledTimes(1);
|
||||
expect(mockHandleSetQueryData).not.toHaveBeenCalled();
|
||||
expect(mockHandleRunQuery).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -45,7 +45,6 @@ import {
|
||||
import {
|
||||
HandleChangeFormulaData,
|
||||
HandleChangeQueryData,
|
||||
HandleChangeQueryDataOptions,
|
||||
HandleChangeQueryDataV5,
|
||||
UseQueryOperations,
|
||||
} from 'types/common/operations.types';
|
||||
@@ -77,7 +76,6 @@ export const useQueryOperations: UseQueryOperations = ({
|
||||
currentQuery,
|
||||
setLastUsedQuery,
|
||||
redirectWithQueryBuilderData,
|
||||
handleRunQuery,
|
||||
} = useQueryBuilder();
|
||||
|
||||
const [operators, setOperators] = useState<SelectOption<string, string>[]>([]);
|
||||
@@ -532,7 +530,7 @@ export const useQueryOperations: UseQueryOperations = ({
|
||||
|
||||
const handleChangeQueryData: HandleChangeQueryData | HandleChangeQueryDataV5 =
|
||||
useCallback(
|
||||
(key: string, value: any, options?: HandleChangeQueryDataOptions) => {
|
||||
(key: string, value: any) => {
|
||||
const newQuery = {
|
||||
...query,
|
||||
[key]:
|
||||
@@ -543,24 +541,8 @@ export const useQueryOperations: UseQueryOperations = ({
|
||||
|
||||
if (isForTraceOperator) {
|
||||
handleSetTraceOperatorData(index, newQuery);
|
||||
return;
|
||||
}
|
||||
|
||||
handleSetQueryData(index, newQuery);
|
||||
|
||||
// `runAfterUpdate` lets callers stage-and-run inline. We pass the
|
||||
// freshly-computed query straight to `handleRunQuery` because the
|
||||
// setState above hasn't flushed yet.
|
||||
if (options?.runAfterUpdate) {
|
||||
handleRunQuery({
|
||||
...currentQuery,
|
||||
builder: {
|
||||
...currentQuery.builder,
|
||||
queryData: currentQuery.builder.queryData.map((item, i) =>
|
||||
i === index ? newQuery : item,
|
||||
),
|
||||
},
|
||||
});
|
||||
} else {
|
||||
handleSetQueryData(index, newQuery);
|
||||
}
|
||||
},
|
||||
[
|
||||
@@ -569,8 +551,6 @@ export const useQueryOperations: UseQueryOperations = ({
|
||||
handleSetQueryData,
|
||||
handleSetTraceOperatorData,
|
||||
isForTraceOperator,
|
||||
handleRunQuery,
|
||||
currentQuery,
|
||||
],
|
||||
);
|
||||
|
||||
|
||||
@@ -1024,57 +1024,49 @@ export function QueryBuilderProvider({
|
||||
[],
|
||||
);
|
||||
|
||||
// `overrideQuery` lets callers run a query value that hasn't been committed
|
||||
// to `currentQuery` state yet — e.g. a click handler that toggles a flag
|
||||
// and wants to stage-and-run in the same tick, without waiting for the
|
||||
// state update to flush.
|
||||
const handleRunQuery = useCallback(
|
||||
(overrideQuery?: Query) => {
|
||||
const isExplorer =
|
||||
location.pathname === ROUTES.LOGS_EXPLORER ||
|
||||
location.pathname === ROUTES.TRACES_EXPLORER;
|
||||
if (isExplorer) {
|
||||
setCalledFromHandleRunQuery(true);
|
||||
}
|
||||
const sourceQuery = overrideQuery ?? currentQuery;
|
||||
const currentQueryData = {
|
||||
...sourceQuery,
|
||||
builder: {
|
||||
...sourceQuery.builder,
|
||||
queryData: sourceQuery.builder.queryData.map((item) => ({
|
||||
...item,
|
||||
filter: {
|
||||
...item.filter,
|
||||
expression:
|
||||
item.filter?.expression.trim() === ''
|
||||
? ''
|
||||
: (item.filter?.expression ?? ''),
|
||||
},
|
||||
filters: {
|
||||
items: [],
|
||||
op: 'AND',
|
||||
},
|
||||
})),
|
||||
},
|
||||
};
|
||||
const handleRunQuery = useCallback(() => {
|
||||
const isExplorer =
|
||||
location.pathname === ROUTES.LOGS_EXPLORER ||
|
||||
location.pathname === ROUTES.TRACES_EXPLORER;
|
||||
if (isExplorer) {
|
||||
setCalledFromHandleRunQuery(true);
|
||||
}
|
||||
const currentQueryData = {
|
||||
...currentQuery,
|
||||
builder: {
|
||||
...currentQuery.builder,
|
||||
queryData: currentQuery.builder.queryData.map((item) => ({
|
||||
...item,
|
||||
filter: {
|
||||
...item.filter,
|
||||
expression:
|
||||
item.filter?.expression.trim() === ''
|
||||
? ''
|
||||
: (item.filter?.expression ?? ''),
|
||||
},
|
||||
filters: {
|
||||
items: [],
|
||||
op: 'AND',
|
||||
},
|
||||
})),
|
||||
},
|
||||
};
|
||||
|
||||
redirectWithQueryBuilderData({
|
||||
...{
|
||||
...currentQueryData,
|
||||
...updateStepInterval({
|
||||
builder: currentQueryData.builder,
|
||||
clickhouse_sql: currentQueryData.clickhouse_sql,
|
||||
promql: currentQueryData.promql,
|
||||
id: currentQueryData.id,
|
||||
queryType,
|
||||
unit: currentQueryData.unit,
|
||||
}),
|
||||
},
|
||||
queryType,
|
||||
});
|
||||
},
|
||||
[currentQuery, location.pathname, queryType, redirectWithQueryBuilderData],
|
||||
);
|
||||
redirectWithQueryBuilderData({
|
||||
...{
|
||||
...currentQueryData,
|
||||
...updateStepInterval({
|
||||
builder: currentQueryData.builder,
|
||||
clickhouse_sql: currentQueryData.clickhouse_sql,
|
||||
promql: currentQueryData.promql,
|
||||
id: currentQueryData.id,
|
||||
queryType,
|
||||
unit: currentQueryData.unit,
|
||||
}),
|
||||
},
|
||||
queryType,
|
||||
});
|
||||
}, [currentQuery, location.pathname, queryType, redirectWithQueryBuilderData]);
|
||||
|
||||
useEffect(() => {
|
||||
if (location.pathname !== currentPathnameRef.current) {
|
||||
|
||||
@@ -26,16 +26,6 @@ type UseQueryOperationsParams = Pick<QueryProps, 'index' | 'query'> &
|
||||
savePreviousQuery?: boolean;
|
||||
};
|
||||
|
||||
export interface HandleChangeQueryDataOptions {
|
||||
/**
|
||||
* When true, stage-and-run the query immediately after the local state
|
||||
* update — no need to wait for the user to click "Stage and Run".
|
||||
* Useful for inline toggles (visibility, disable, etc.) where the panel
|
||||
* should reflect the change without an extra click.
|
||||
*/
|
||||
runAfterUpdate?: boolean;
|
||||
}
|
||||
|
||||
// Generic type that can work with both legacy and V5 query types
|
||||
export type HandleChangeQueryData<T = IBuilderQuery> = <
|
||||
Key extends keyof T,
|
||||
@@ -43,7 +33,6 @@ export type HandleChangeQueryData<T = IBuilderQuery> = <
|
||||
>(
|
||||
key: Key,
|
||||
value: Value,
|
||||
options?: HandleChangeQueryDataOptions,
|
||||
) => void;
|
||||
|
||||
export type HandleChangeTraceOperatorData<T = IBuilderTraceOperator> = <
|
||||
|
||||
@@ -280,7 +280,7 @@ export type QueryBuilderContextType = {
|
||||
shallStringify?: boolean,
|
||||
newTab?: boolean,
|
||||
) => void;
|
||||
handleRunQuery: (overrideQuery?: Query) => void;
|
||||
handleRunQuery: () => void;
|
||||
resetQuery: (newCurrentQuery?: QueryState) => void;
|
||||
handleOnUnitsChange: (units: Format['id']) => void;
|
||||
updateAllQueriesOperators: (
|
||||
|
||||
@@ -124,7 +124,7 @@ func (b *traceQueryStatementBuilder) Build(
|
||||
-------------------------------- End of tech debt ----------------------------
|
||||
*/
|
||||
|
||||
query = b.adjustKeys(ctx, keys, query, requestType)
|
||||
adjustTraceKeys(ctx, b.logger, keys, &query, requestType)
|
||||
|
||||
// Create SQL builder
|
||||
q := sqlbuilder.NewSelectBuilder()
|
||||
@@ -193,24 +193,30 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation])
|
||||
return keySelectors
|
||||
}
|
||||
|
||||
func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] {
|
||||
|
||||
// add deprecated fields only during statement building
|
||||
// why?
|
||||
// 1. to not fail filter expression that use deprecated cols
|
||||
// 2. this could have been moved to metadata fetching itself, however, that
|
||||
// would mean, they also show up in suggestions we we don't want to do
|
||||
// 3. reason for not doing a simple append is to keep intrinsic/calculated field first so that it gets
|
||||
// priority in multi_if sql expression
|
||||
// mergeDeprecatedTraceKeys prepends deprecated intrinsic/calculated trace field
|
||||
// definitions to the keys map. We do this during statement building, not at
|
||||
// metadata fetch time, because:
|
||||
// 1. Filter expressions that reference deprecated columns must continue to
|
||||
// resolve — otherwise they fail with "key not found".
|
||||
// 2. Doing it at metadata fetch time would also surface deprecated keys in
|
||||
// autocomplete suggestions, which we don't want.
|
||||
// 3. We prepend (not append) so the intrinsic/calculated entry wins ordering
|
||||
// in the multi_if SQL expression.
|
||||
func mergeDeprecatedTraceKeys(keys map[string][]*telemetrytypes.TelemetryFieldKey) {
|
||||
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
|
||||
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
|
||||
}
|
||||
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
|
||||
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
|
||||
}
|
||||
}
|
||||
|
||||
func adjustTraceKeys(ctx context.Context, logger *slog.Logger, keys map[string][]*telemetrytypes.TelemetryFieldKey, query *qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) {
|
||||
|
||||
mergeDeprecatedTraceKeys(keys)
|
||||
|
||||
// Adjust keys for alias expressions in aggregations
|
||||
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
|
||||
actions := querybuilder.AdjustKeysForAliasExpressions(query, requestType)
|
||||
|
||||
/*
|
||||
Check if user is using multiple contexts or data types for same field name
|
||||
@@ -228,7 +234,7 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
|
||||
and make it just http.status_code and remove the duplicate entry.
|
||||
*/
|
||||
|
||||
actions = append(actions, querybuilder.AdjustDuplicateKeys(&query)...)
|
||||
actions = append(actions, querybuilder.AdjustDuplicateKeys(query)...)
|
||||
|
||||
/*
|
||||
Now adjust each key to have correct context and data type
|
||||
@@ -236,24 +242,23 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
|
||||
Reason for doing this is to not create an unexpected behavior for users
|
||||
*/
|
||||
for idx := range query.SelectFields {
|
||||
actions = append(actions, b.adjustKey(&query.SelectFields[idx], keys)...)
|
||||
actions = append(actions, adjustTraceKey(&query.SelectFields[idx], keys)...)
|
||||
}
|
||||
for idx := range query.GroupBy {
|
||||
actions = append(actions, b.adjustKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
|
||||
actions = append(actions, adjustTraceKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
|
||||
}
|
||||
for idx := range query.Order {
|
||||
actions = append(actions, b.adjustKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
|
||||
actions = append(actions, adjustTraceKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
|
||||
}
|
||||
|
||||
for _, action := range actions {
|
||||
// TODO: change to debug level once we are confident about the behavior
|
||||
b.logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
|
||||
logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
|
||||
}
|
||||
|
||||
return query
|
||||
}
|
||||
|
||||
func (b *traceQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
|
||||
// adjustTraceKey resolves a single TelemetryFieldKey against the keys map.
|
||||
func adjustTraceKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
|
||||
|
||||
// for recording actions taken
|
||||
actions := []string{}
|
||||
|
||||
@@ -1125,28 +1125,13 @@ func TestAdjustKey(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
statementBuilder := NewTraceQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
aggExprRewriter,
|
||||
nil,
|
||||
fl,
|
||||
)
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
// Create a copy of the input key to avoid modifying the original
|
||||
key := c.inputKey
|
||||
|
||||
// Call adjustKey
|
||||
statementBuilder.adjustKey(&key, c.keysMap)
|
||||
adjustTraceKey(&key, c.keysMap)
|
||||
|
||||
// Verify the key was adjusted as expected
|
||||
require.Equal(t, c.expectedKey.Name, key.Name, "key name should match")
|
||||
@@ -1424,7 +1409,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
}
|
||||
|
||||
// Call adjustKeys
|
||||
c.query = statementBuilder.adjustKeys(context.Background(), keysMapCopy, c.query, qbtypes.RequestTypeScalar)
|
||||
adjustTraceKeys(context.Background(), statementBuilder.logger, keysMapCopy, &c.query, qbtypes.RequestTypeScalar)
|
||||
|
||||
// Verify select fields were adjusted
|
||||
if c.expectedSelectFields != nil {
|
||||
|
||||
@@ -197,6 +197,10 @@ func (b *traceOperatorCTEBuilder) buildQueryCTE(ctx context.Context, queryName s
|
||||
}
|
||||
b.stmtBuilder.logger.DebugContext(ctx, "Retrieved keys for query", slog.String("query_name", queryName), slog.Int("keys_count", len(keys)))
|
||||
|
||||
// The CTE only selects spans matching the filter. Aggregations, group by
|
||||
// and order by run later in buildFinalQuery, so RequestTypeRaw is fine here.
|
||||
adjustTraceKeys(ctx, b.stmtBuilder.logger, keys, query, qbtypes.RequestTypeRaw)
|
||||
|
||||
// Build resource filter CTE for this specific query
|
||||
resourceFilterCTEName := fmt.Sprintf("__resource_filter_%s", cteName)
|
||||
resourceStmt, err := b.buildResourceFilterCTE(ctx, *query)
|
||||
@@ -398,21 +402,28 @@ func (b *traceOperatorCTEBuilder) buildNotCTE(leftCTE, rightCTE string) (string,
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildFinalQuery(ctx context.Context, selectFromCTE string, requestType qbtypes.RequestType) (*qbtypes.Statement, error) {
|
||||
keySelectors := b.getKeySelectors()
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.adjustOperatorKeys(ctx, keys, requestType)
|
||||
|
||||
switch requestType {
|
||||
case qbtypes.RequestTypeRaw:
|
||||
return b.buildListQuery(ctx, selectFromCTE)
|
||||
return b.buildListQuery(ctx, selectFromCTE, keys)
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
return b.buildTimeSeriesQuery(ctx, selectFromCTE)
|
||||
return b.buildTimeSeriesQuery(ctx, selectFromCTE, keys)
|
||||
case qbtypes.RequestTypeTrace:
|
||||
return b.buildTraceQuery(ctx, selectFromCTE)
|
||||
return b.buildTraceQuery(ctx, selectFromCTE, keys)
|
||||
case qbtypes.RequestTypeScalar:
|
||||
return b.buildScalarQuery(ctx, selectFromCTE)
|
||||
return b.buildScalarQuery(ctx, selectFromCTE, keys)
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
// Select core fields
|
||||
@@ -434,22 +445,6 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
|
||||
"parent_span_id": true,
|
||||
}
|
||||
|
||||
// Get keys for selectFields
|
||||
keySelectors := b.getKeySelectors()
|
||||
for _, field := range b.operator.SelectFields {
|
||||
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
||||
Name: field.Name,
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: field.FieldContext,
|
||||
FieldDataType: field.FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Add selectFields using ColumnExpressionFor since we now have all base table columns
|
||||
for _, field := range b.operator.SelectFields {
|
||||
if selectedFields[field.Name] {
|
||||
@@ -499,6 +494,45 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
|
||||
}, nil
|
||||
}
|
||||
|
||||
// adjustOperatorKeys runs the same key adjustments as adjustTraceKeys, but on
|
||||
// the operator's own fields. The operator has a different struct shape than
|
||||
// QueryBuilderQuery, so we copy the relevant fields into a temp query, run
|
||||
// the shared helpers, and copy the results back.
|
||||
func (b *traceOperatorCTEBuilder) adjustOperatorKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, requestType qbtypes.RequestType) {
|
||||
mergeDeprecatedTraceKeys(keys)
|
||||
|
||||
tmp := qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Aggregations: b.operator.Aggregations,
|
||||
SelectFields: b.operator.SelectFields,
|
||||
GroupBy: b.operator.GroupBy,
|
||||
Order: b.operator.Order,
|
||||
}
|
||||
|
||||
actions := querybuilder.AdjustKeysForAliasExpressions(&tmp, requestType)
|
||||
actions = append(actions, querybuilder.AdjustDuplicateKeys(&tmp)...)
|
||||
|
||||
for idx := range tmp.SelectFields {
|
||||
actions = append(actions, adjustTraceKey(&tmp.SelectFields[idx], keys)...)
|
||||
}
|
||||
for idx := range tmp.GroupBy {
|
||||
actions = append(actions, adjustTraceKey(&tmp.GroupBy[idx].TelemetryFieldKey, keys)...)
|
||||
}
|
||||
for idx := range tmp.Order {
|
||||
actions = append(actions, adjustTraceKey(&tmp.Order[idx].Key.TelemetryFieldKey, keys)...)
|
||||
}
|
||||
|
||||
// Copy back the three slices that the helpers above can rewrite
|
||||
// (AdjustDuplicateKeys reconstructs them, adjustTraceKey mutates in place).
|
||||
// Aggregations is only read by the helpers, never reassigned, so no copy-back.
|
||||
b.operator.SelectFields = tmp.SelectFields
|
||||
b.operator.GroupBy = tmp.GroupBy
|
||||
b.operator.Order = tmp.Order
|
||||
|
||||
for _, action := range actions {
|
||||
b.stmtBuilder.logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
|
||||
}
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySelector {
|
||||
var keySelectors []*telemetrytypes.FieldKeySelector
|
||||
|
||||
@@ -526,6 +560,15 @@ func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySe
|
||||
})
|
||||
}
|
||||
|
||||
for _, sf := range b.operator.SelectFields {
|
||||
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
||||
Name: sf.Name,
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: sf.FieldContext,
|
||||
FieldDataType: sf.FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
for i := range keySelectors {
|
||||
keySelectors[i].Signal = telemetrytypes.SignalTraces
|
||||
}
|
||||
@@ -533,7 +576,7 @@ func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySe
|
||||
return keySelectors
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
sb.Select(fmt.Sprintf(
|
||||
@@ -541,12 +584,6 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
|
||||
int64(b.operator.StepInterval.Seconds()),
|
||||
))
|
||||
|
||||
keySelectors := b.getKeySelectors()
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
@@ -625,8 +662,7 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
|
||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||
|
||||
// Add HAVING clause if specified
|
||||
err = b.addHavingClause(sb)
|
||||
if err != nil {
|
||||
if err := b.addHavingClause(sb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -653,17 +689,11 @@ func (b *traceOperatorCTEBuilder) buildTraceSummaryCTE(selectFromCTE string) {
|
||||
b.addCTE("trace_summary", sql, args, []string{"all_spans", selectFromCTE})
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
|
||||
b.buildTraceSummaryCTE(selectFromCTE)
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
keySelectors := b.getKeySelectors()
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
@@ -745,8 +775,7 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
|
||||
sb.GroupBy(groupByKeys...)
|
||||
}
|
||||
|
||||
err = b.addHavingClause(sb)
|
||||
if err != nil {
|
||||
if err := b.addHavingClause(sb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -802,15 +831,9 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
keySelectors := b.getKeySelectors()
|
||||
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
for _, gb := range b.operator.GroupBy {
|
||||
@@ -892,8 +915,7 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
|
||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||
|
||||
// Add HAVING clause if specified
|
||||
err = b.addHavingClause(sb)
|
||||
if err != nil {
|
||||
if err := b.addHavingClause(sb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package telemetrytraces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -14,6 +15,24 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newTestTraceOperatorStatementBuilder(t *testing.T) *traceOperatorStatementBuilder {
|
||||
t.Helper()
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
traceStmtBuilder := NewTraceQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore, fm, cb, aggExprRewriter, nil, fl,
|
||||
)
|
||||
return NewTraceOperatorStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore, fm, cb, traceStmtBuilder, aggExprRewriter, fl,
|
||||
)
|
||||
}
|
||||
|
||||
func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
@@ -387,32 +406,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
traceStmtBuilder := NewTraceQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
aggExprRewriter,
|
||||
nil,
|
||||
fl,
|
||||
)
|
||||
|
||||
statementBuilder := NewTraceOperatorStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
traceStmtBuilder,
|
||||
aggExprRewriter,
|
||||
fl,
|
||||
)
|
||||
statementBuilder := newTestTraceOperatorStatementBuilder(t)
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
@@ -503,32 +497,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
traceStmtBuilder := NewTraceQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
aggExprRewriter,
|
||||
nil,
|
||||
fl,
|
||||
)
|
||||
|
||||
statementBuilder := NewTraceOperatorStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
traceStmtBuilder,
|
||||
aggExprRewriter,
|
||||
fl,
|
||||
)
|
||||
statementBuilder := newTestTraceOperatorStatementBuilder(t)
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
@@ -550,3 +519,143 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTraceOperatorStatementBuilderAdjustsKeys(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
operator qbtypes.QueryBuilderTraceOperator
|
||||
builderFilter string
|
||||
wantSQL string
|
||||
wantArgs []any
|
||||
}{
|
||||
{
|
||||
name: "deprecated duration filter in referenced builder query",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
operator: qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "A",
|
||||
Limit: 10,
|
||||
},
|
||||
builderFilter: "durationNano = '3s'",
|
||||
wantSQL: "duration_nano = ?",
|
||||
wantArgs: []any{int64(3000000000)},
|
||||
},
|
||||
{
|
||||
name: "context-prefixed aggregation alias in order by",
|
||||
requestType: qbtypes.RequestTypeScalar,
|
||||
operator: qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "A",
|
||||
Aggregations: []qbtypes.TraceAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
Alias: "span.count_",
|
||||
},
|
||||
},
|
||||
Order: []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "count_",
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSQL: "ORDER BY __result_0 desc",
|
||||
},
|
||||
}
|
||||
|
||||
statementBuilder := newTestTraceOperatorStatementBuilder(t)
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
err := c.operator.ParseExpression()
|
||||
require.NoError(t, err)
|
||||
|
||||
filter := c.builderFilter
|
||||
if filter == "" {
|
||||
filter = "service.name = 'frontend'"
|
||||
}
|
||||
|
||||
q, err := statementBuilder.Build(
|
||||
context.Background(),
|
||||
1747947419000,
|
||||
1747983448000,
|
||||
c.requestType,
|
||||
c.operator,
|
||||
&qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: filter},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, q.Query, c.wantSQL)
|
||||
for _, arg := range c.wantArgs {
|
||||
require.Contains(t, q.Args, arg)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestTraceOperatorStatementBuilderDeduplicatesKeys checks that a trace
|
||||
// operator with the same field name listed twice in GroupBy (once with a
|
||||
// context, once without) ends up with a single column in the outer SELECT
|
||||
// and a single entry in GROUP BY.
|
||||
func TestTraceOperatorStatementBuilderDeduplicatesKeys(t *testing.T) {
|
||||
statementBuilder := newTestTraceOperatorStatementBuilder(t)
|
||||
|
||||
operator := qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "A",
|
||||
Aggregations: []qbtypes.TraceAggregation{
|
||||
{Expression: "count()"},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
}},
|
||||
// Same name, no context — should be merged with the entry above.
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.method",
|
||||
}},
|
||||
},
|
||||
}
|
||||
require.NoError(t, operator.ParseExpression())
|
||||
|
||||
q, err := statementBuilder.Build(
|
||||
context.Background(),
|
||||
1747947419000,
|
||||
1747983448000,
|
||||
qbtypes.RequestTypeScalar,
|
||||
operator,
|
||||
&qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'frontend'"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, strings.Count(q.Query, "AS `http.method`"),
|
||||
"http.method should appear once in SELECT after dedup, got: %s", q.Query)
|
||||
require.NotContains(t, q.Query, "`http.method`, `http.method`",
|
||||
"GROUP BY should list http.method once after dedup, got: %s", q.Query)
|
||||
}
|
||||
|
||||
51
tests/fixtures/querier.py
vendored
51
tests/fixtures/querier.py
vendored
@@ -413,6 +413,57 @@ def find_named_result(
|
||||
)
|
||||
|
||||
|
||||
def assert_scalar_value(
|
||||
response: requests.Response,
|
||||
name: str,
|
||||
expected: Any,
|
||||
*,
|
||||
row: int = 0,
|
||||
col: int = 0,
|
||||
) -> None:
|
||||
"""Assert that the named scalar result has `expected` at data[row][col]."""
|
||||
result = find_named_result(response.json()["data"]["data"]["results"], name)
|
||||
assert result is not None, f"no result for query {name}"
|
||||
assert result["data"][row][col] == expected, f"expected {expected} at [{row}][{col}], got {result['data'][row][col]}"
|
||||
|
||||
|
||||
def assert_grouped_scalar(
|
||||
response: requests.Response,
|
||||
name: str,
|
||||
*,
|
||||
expected_groups: int,
|
||||
expected_columns: int,
|
||||
last_col_value: Any | None = None,
|
||||
) -> None:
|
||||
"""Assert grouped scalar result has the expected column count and group count.
|
||||
If `last_col_value` is set and there is exactly one group, also assert the
|
||||
last column of that single row equals it (a common aggregation-value check)."""
|
||||
result = find_named_result(response.json()["data"]["data"]["results"], name)
|
||||
assert result is not None, f"no result for query {name}"
|
||||
columns = result["columns"]
|
||||
rows = result["data"]
|
||||
assert len(columns) == expected_columns, f"expected {expected_columns} columns, got {len(columns)}: {columns}"
|
||||
assert len(rows) == expected_groups, f"expected {expected_groups} groups, got {len(rows)}: {rows}"
|
||||
if last_col_value is not None and expected_groups == 1:
|
||||
assert rows[0][-1] == last_col_value, f"expected last col {last_col_value}, got row {rows[0]}"
|
||||
|
||||
|
||||
def assert_raw_row_subset(
|
||||
response: requests.Response,
|
||||
name: str,
|
||||
expected: dict[str, Any],
|
||||
*,
|
||||
row: int = 0,
|
||||
) -> None:
|
||||
"""Assert that the named raw result's rows[row]['data'] is a superset of `expected`."""
|
||||
result = find_named_result(response.json()["data"]["data"]["results"], name)
|
||||
assert result is not None, f"no result for query {name}"
|
||||
rows = result["rows"]
|
||||
assert rows is not None, f"no rows for query {name}"
|
||||
data = rows[row]["data"]
|
||||
assert expected.items() <= data.items(), f"expected subset {expected}, got data {data}"
|
||||
|
||||
|
||||
def build_scalar_query(
|
||||
name: str,
|
||||
signal: str,
|
||||
|
||||
@@ -9,8 +9,11 @@ import requests
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.querier import (
|
||||
assert_grouped_scalar,
|
||||
assert_identical_query_response,
|
||||
assert_minutely_bucket_values,
|
||||
assert_raw_row_subset,
|
||||
assert_scalar_value,
|
||||
find_named_result,
|
||||
format_timestamp,
|
||||
generate_traces_with_corrupt_metadata,
|
||||
@@ -693,6 +696,176 @@ def test_traces_list_with_corrupt_data(
|
||||
assert data[key] == value
|
||||
|
||||
|
||||
def _expected_trace_subset(trace: Traces) -> dict[str, Any]:
|
||||
return {
|
||||
"duration_nano": trace.duration_nano,
|
||||
"name": trace.name,
|
||||
"parent_span_id": trace.parent_span_id,
|
||||
"span_id": trace.span_id,
|
||||
"timestamp": format_timestamp(trace.timestamp),
|
||||
"trace_id": trace.trace_id,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"payload_factory,request_type,assert_result",
|
||||
[
|
||||
# Case 1: CTE filter uses the deprecated intrinsic field `durationNano`.
|
||||
pytest.param(
|
||||
lambda traces: [
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": 'durationNano = "3s"'},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "B",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": 'durationNano = "5s"'},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_trace_operator",
|
||||
"spec": {
|
||||
"name": "C",
|
||||
"expression": "A => B",
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
],
|
||||
"raw",
|
||||
lambda response, traces: assert_raw_row_subset(response, "C", _expected_trace_subset(traces[0])),
|
||||
id="deprecated-intrinsic-filter",
|
||||
),
|
||||
# Case 2: CTE filter uses the deprecated calculated field `responseStatusCode`.
|
||||
pytest.param(
|
||||
lambda traces: [
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": 'responseStatusCode = "200"'},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "B",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": 'durationNano = "5s"'},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_trace_operator",
|
||||
"spec": {
|
||||
"name": "C",
|
||||
"expression": "A => B",
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
],
|
||||
"raw",
|
||||
lambda response, traces: assert_raw_row_subset(response, "C", _expected_trace_subset(traces[0])),
|
||||
id="deprecated-calculated-filter",
|
||||
),
|
||||
# Case 3: order by uses `count_` with fieldContext `span`, which has
|
||||
# to be rewritten to the aggregation alias `span.count_`.
|
||||
pytest.param(
|
||||
lambda traces: [
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"aggregations": [{"expression": "count()"}],
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_trace_operator",
|
||||
"spec": {
|
||||
"name": "C",
|
||||
"expression": "A",
|
||||
"aggregations": [{"expression": "count()", "alias": "span.count_"}],
|
||||
"order": [{"key": {"name": "count_", "fieldContext": "span"}, "direction": "desc"}],
|
||||
},
|
||||
},
|
||||
],
|
||||
"scalar",
|
||||
lambda response, traces: assert_scalar_value(response, "C", len(traces)),
|
||||
id="context-prefixed-aggregation-alias-order",
|
||||
),
|
||||
# Case 4: group by lists `cloud.provider` twice (once with a resource
|
||||
# context, once without).
|
||||
pytest.param(
|
||||
lambda traces: [
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": True,
|
||||
"aggregations": [{"expression": "count()"}],
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_trace_operator",
|
||||
"spec": {
|
||||
"name": "C",
|
||||
"expression": "A",
|
||||
"aggregations": [{"expression": "count()"}],
|
||||
"groupBy": [
|
||||
{"name": "cloud.provider", "fieldContext": "resource"},
|
||||
{"name": "cloud.provider"},
|
||||
],
|
||||
},
|
||||
},
|
||||
],
|
||||
"scalar",
|
||||
lambda response, traces: assert_grouped_scalar(response, "C", expected_groups=1, expected_columns=2, last_col_value=len(traces)),
|
||||
id="duplicate-group-by-deduplicated",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_trace_operator_with_adjusted_keys(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
payload_factory: Callable[[list[Traces]], list[dict[str, Any]]],
|
||||
request_type: str,
|
||||
assert_result: Callable[[requests.Response, list[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Trace operators build a CTE per referenced builder query and an outer
|
||||
query on top. Both layers need the same key adjustment as regular trace
|
||||
queries, otherwise deprecated keys and context-prefixed aliases don't
|
||||
resolve.
|
||||
"""
|
||||
traces = generate_traces_with_corrupt_metadata()
|
||||
insert_traces(traces)
|
||||
payload = payload_factory(traces)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=5)).timestamp() * 1000),
|
||||
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
|
||||
request_type=request_type,
|
||||
queries=payload,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK, response.text
|
||||
assert_result(response, traces)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_by,aggregation_alias,expected_status",
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user