Compare commits

..

10 Commits

Author SHA1 Message Date
Nikhil Soni
168b2eaa9c feat: query full spans for smaller traces 2026-05-27 00:12:38 +05:30
Nikhil Soni
6b613f18a3 feat: add api and module for flamegraph v3 2026-05-26 20:04:20 +05:30
Nikhil Soni
1b0447181d feat: add method to enrich selected spans 2026-05-26 20:03:47 +05:30
Nikhil Soni
20edff4771 feat: add config for flamegraph 2026-05-26 19:21:33 +05:30
Nikhil Soni
2048ef3d2f chore: remove limit from request payload
It's a new api so doesn't need to be backward compatible
2026-05-26 19:06:48 +05:30
Nikhil Soni
53c551359e feat: add types for flamegraph v3 in module structure 2026-05-26 18:56:35 +05:30
Nikhil Soni
1e326159b0 feat(tracedetail): add waterfall api with memory optimisations (#11450)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
* feat: add store methods for minimal trace fetch

* feat: break down waterfall module to handle large spans

Handling large traces in two steps to avoid high
memory allocation

* refactor: keep the waterfall changes in new api version

This is to avoid the contract change in existing v3

* chore: avoid unnecessary diffs

* refactor: move conversion logic to types

* chore: update openapi specs

* refactor: use sqlbuider for queries

* chore: fix comment

* chore: avoid passing request type to module

* refactor: avoid passing whole summary object around

* chore: remove trace_id from querying since its already known

* chore: remove unused reference column from query

* chore: update openapi specs
2026-05-26 10:11:16 +00:00
Nityananda Gohain
ceb1b4871b feat: trace based filters for logs, supporting aggregations as well (#11394)
* feat: trace based filters for logs, supporting aggregations as well

* fix: update comments

* fix: cleanup query from tests

* fix: address comments

* fix: address comments

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
2026-05-26 09:57:18 +00:00
Abhi kumar
d48a238e15 chore: broke down drilldown navigate into a saperate hook (#11070)
* chore: broke down drilldown navigate into a saperate hook

* chore: fmt fix
2026-05-26 06:16:37 +00:00
Abhi kumar
2ca6ff7719 test: added test for crosshair series highlight changes (#11015)
* chore: added changes for crosshair sync for tooltip

* chore: minor cleanup

* chore: updated the core structure

* chore: updated the types

* chore: minor cleanup

* feat: added changes for sereis highlighting on crosshair sync

* test: added test for crosshair series highlight changes

* chore: pr review fixes

* chore: handled other cases of groupby

* chore: updated tests
2026-05-26 06:09:52 +00:00
33 changed files with 3051 additions and 233 deletions

View File

@@ -434,6 +434,17 @@ tracedetail:
max_depth_to_auto_expand: 5
# Threshold below which all spans are returned without windowing.
max_limit_to_select_all_spans: 10000
flamegraph:
# Maximum number of BFS depth levels included in a windowed response.
max_selected_levels: 50
# Maximum spans per level before sampling is applied.
max_spans_per_level: 100
# Number of highest-latency spans always included when sampling a level.
sampling_top_latency_count: 5
# Number of timestamp buckets used for uniform sampling within a level.
sampling_bucket_count: 50
# Threshold below which all spans are returned without windowing or sampling.
select_all_spans_limit: 100000
##################### Authz #################################
authz:

View File

@@ -18948,6 +18948,77 @@ paths:
summary: Get waterfall view for a trace
tags:
- tracedetail
/api/v4/traces/{traceID}/waterfall:
post:
deprecated: false
description: Returns the waterfall view of spans including all spans if total
spans are under a limit, a max count otherwise. Aggregations are dropped compared
to v3
operationId: GetWaterfallV4
parameters:
- in: path
name: traceID
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesPostableWaterfall'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesGettableWaterfallTrace'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get waterfall view for a trace
tags:
- tracedetail
/api/v5/query_range:
post:
deprecated: false

View File

@@ -9232,6 +9232,17 @@ export type GetWaterfall200 = {
status: string;
};
export type GetWaterfallV4PathParameters = {
traceID: string;
};
export type GetWaterfallV4200 = {
data: SpantypesGettableWaterfallTraceDTO;
/**
* @type string
*/
status: string;
};
export type QueryRangeV5200 = {
data: Querybuildertypesv5QueryRangeResponseDTO;
/**

View File

@@ -14,6 +14,8 @@ import type {
import type {
GetWaterfall200,
GetWaterfallPathParameters,
GetWaterfallV4200,
GetWaterfallV4PathParameters,
RenderErrorResponseDTO,
SpantypesPostableWaterfallDTO,
} from '../sigNoz.schemas';
@@ -120,3 +122,102 @@ export const useGetWaterfall = <
> => {
return useMutation(getGetWaterfallMutationOptions(options));
};
/**
* Returns the waterfall view of spans including all spans if total spans are under a limit, a max count otherwise. Aggregations are dropped compared to v3
* @summary Get waterfall view for a trace
*/
export const getWaterfallV4 = (
{ traceID }: GetWaterfallV4PathParameters,
spantypesPostableWaterfallDTO?: BodyType<SpantypesPostableWaterfallDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<GetWaterfallV4200>({
url: `/api/v4/traces/${traceID}/waterfall`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesPostableWaterfallDTO,
signal,
});
};
export const getGetWaterfallV4MutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof getWaterfallV4>>,
TError,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof getWaterfallV4>>,
TError,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
},
TContext
> => {
const mutationKey = ['getWaterfallV4'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof getWaterfallV4>>,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return getWaterfallV4(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type GetWaterfallV4MutationResult = NonNullable<
Awaited<ReturnType<typeof getWaterfallV4>>
>;
export type GetWaterfallV4MutationBody =
| BodyType<SpantypesPostableWaterfallDTO>
| undefined;
export type GetWaterfallV4MutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Get waterfall view for a trace
*/
export const useGetWaterfallV4 = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof getWaterfallV4>>,
TError,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof getWaterfallV4>>,
TError,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
},
TContext
> => {
return useMutation(getGetWaterfallV4MutationOptions(options));
};

View File

@@ -0,0 +1,282 @@
import { Widgets } from 'types/api/dashboard/getAll';
import {
MetricRangePayloadProps,
MetricRangePayloadV3,
} from 'types/api/metrics/getQueryRange';
import { Query } from 'types/api/queryBuilder/queryBuilderData';
import { PanelMode } from '../../types';
import { prepareBarPanelConfig } from '../utils';
import { prepareChartData } from 'lib/uPlotV2/utils/dataUtils';
jest.mock(
'container/DashboardContainer/visualization/panels/utils/legendVisibilityUtils',
() => ({
getStoredSeriesVisibility: jest.fn(),
}),
);
jest.mock('lib/uPlotLib/plugins/onClickPlugin', () => ({
__esModule: true,
default: jest.fn().mockReturnValue({ name: 'onClickPlugin' }),
}));
jest.mock('lib/dashboard/getQueryResults', () => ({
getLegend: jest.fn(
(_queryData: unknown, _query: unknown, labelName: string) =>
`legend-${labelName}`,
),
}));
jest.mock('lib/getLabelName', () => ({
__esModule: true,
default: jest.fn(
(_metric: unknown, _queryName: string, _legend: string) => 'baseLabel',
),
}));
jest.mock(
'container/DashboardContainer/visualization/charts/utils/stackSeriesUtils',
() => ({
getInitialStackedBands: jest.fn().mockReturnValue([]),
}),
);
const getLegendMock = jest.requireMock('lib/dashboard/getQueryResults')
.getLegend as jest.Mock;
const getLabelNameMock = jest.requireMock('lib/getLabelName')
.default as jest.Mock;
const getInitialStackedBandsMock = jest.requireMock(
'container/DashboardContainer/visualization/charts/utils/stackSeriesUtils',
).getInitialStackedBands as jest.Mock;
const createApiResponse = (
result: MetricRangePayloadProps['data']['result'] = [],
): MetricRangePayloadProps => ({
data: {
result,
resultType: 'matrix',
newResult: null as unknown as MetricRangePayloadV3,
},
});
const createWidget = (overrides: Partial<Widgets> = {}): Widgets =>
({
id: 'widget-1',
yAxisUnit: 'ms',
isLogScale: false,
thresholds: [],
customLegendColors: {},
...overrides,
}) as Widgets;
const defaultTimezone = {
name: 'UTC',
value: 'UTC',
offset: 'UTC',
searchIndex: 'UTC',
};
describe('BarPanel utils', () => {
beforeEach(() => {
jest.clearAllMocks();
getLabelNameMock.mockReturnValue('baseLabel');
getLegendMock.mockImplementation(
(_queryData: unknown, _query: unknown, labelName: string) =>
`legend-${labelName}`,
);
});
describe('prepareBarPanelData', () => {
it('returns aligned data with timestamps and empty series when result is empty', () => {
const data = prepareChartData(createApiResponse([]));
expect(data).toHaveLength(1);
expect(data[0]).toStrictEqual([]);
});
it('returns timestamps and one series of y values for single series', () => {
const data = prepareChartData(
createApiResponse([
{
metric: {},
queryName: 'Q',
legend: 'Series A',
values: [
[1000, '10'],
[2000, '20'],
],
} as MetricRangePayloadProps['data']['result'][0],
]),
);
expect(data).toHaveLength(2);
expect(data[0]).toStrictEqual([1000, 2000]);
expect(data[1]).toStrictEqual([10, 20]);
});
it('merges timestamps and fills missing values with null for multiple series', () => {
const data = prepareChartData(
createApiResponse([
{
metric: {},
queryName: 'Q1',
values: [
[1000, '1'],
[3000, '3'],
],
} as MetricRangePayloadProps['data']['result'][0],
{
metric: {},
queryName: 'Q2',
values: [
[1000, '10'],
[2000, '20'],
],
} as MetricRangePayloadProps['data']['result'][0],
]),
);
expect(data[0]).toStrictEqual([1000, 2000, 3000]);
expect(data[1]).toStrictEqual([1, null, 3]);
expect(data[2]).toStrictEqual([10, 20, null]);
});
});
describe('prepareBarPanelConfig', () => {
const baseParams = {
widget: createWidget(),
isDarkMode: true,
currentQuery: {} as Query,
onClick: jest.fn(),
onDragSelect: jest.fn(),
apiResponse: createApiResponse(),
timezone: defaultTimezone,
panelMode: PanelMode.DASHBOARD_VIEW,
};
it('adds no series when apiResponse has empty result', () => {
const config = prepareBarPanelConfig(baseParams).getConfig();
expect(config.series).toHaveLength(1);
});
it('adds one series per result item', () => {
const apiResponse = createApiResponse([
{
metric: {},
queryName: 'Q1',
values: [[1000, '1']],
} as MetricRangePayloadProps['data']['result'][0],
{
metric: {},
queryName: 'Q2',
values: [[1000, '2']],
} as MetricRangePayloadProps['data']['result'][0],
]);
const config = prepareBarPanelConfig({
...baseParams,
apiResponse,
}).getConfig();
expect(config.series).toHaveLength(3);
});
it('uses getLegend for label when currentQuery is provided', () => {
const apiResponse = createApiResponse([
{
metric: {},
queryName: 'Q1',
legend: 'L1',
values: [[1000, '1']],
} as MetricRangePayloadProps['data']['result'][0],
]);
const config = prepareBarPanelConfig({
...baseParams,
apiResponse,
currentQuery: {} as Query,
}).getConfig();
expect(getLegendMock).toHaveBeenCalled();
expect(config.series?.[1]).toMatchObject({ label: 'legend-baseLabel' });
});
it('uses getLabelName for label when currentQuery is null', () => {
getLegendMock.mockReset();
const apiResponse = createApiResponse([
{
metric: { __name__: 'requests' },
queryName: 'Q1',
values: [[1000, '1']],
} as MetricRangePayloadProps['data']['result'][0],
]);
prepareBarPanelConfig({
...baseParams,
apiResponse,
currentQuery: null as unknown as Query,
});
expect(getLabelNameMock).toHaveBeenCalled();
expect(getLegendMock).not.toHaveBeenCalled();
});
it('passes result metric to each series for cross-panel sync', () => {
const metric = { host: 'server1', __name__: 'http_requests' };
const apiResponse = createApiResponse([
{
metric,
queryName: 'Q1',
values: [[1000, '1']],
} as MetricRangePayloadProps['data']['result'][0],
]);
const config = prepareBarPanelConfig({
...baseParams,
apiResponse,
}).getConfig();
expect(config.series?.[1]).toMatchObject({ metric });
});
it('uses widget customLegendColors for series stroke', () => {
const widget = createWidget({
customLegendColors: { 'legend-baseLabel': '#ff0000' },
});
const apiResponse = createApiResponse([
{
metric: {},
queryName: 'Q',
values: [[1000, '1']],
} as MetricRangePayloadProps['data']['result'][0],
]);
const config = prepareBarPanelConfig({
...baseParams,
widget,
apiResponse,
}).getConfig();
expect(config.series?.[1]).toMatchObject({ stroke: '#ff0000' });
});
it('calls getInitialStackedBands when widget is stackedBarChart', () => {
const widget = createWidget({ stackedBarChart: true });
const apiResponse = createApiResponse([
{
metric: {},
queryName: 'Q1',
values: [[1000, '1']],
} as MetricRangePayloadProps['data']['result'][0],
{
metric: {},
queryName: 'Q2',
values: [[1000, '2']],
} as MetricRangePayloadProps['data']['result'][0],
]);
prepareBarPanelConfig({ ...baseParams, widget, apiResponse });
// seriesCount = result.length + 1 = 3
expect(getInitialStackedBandsMock).toHaveBeenCalledWith(3);
});
it('does not call getInitialStackedBands for non-stacked chart', () => {
const apiResponse = createApiResponse([
{
metric: {},
queryName: 'Q1',
values: [[1000, '1']],
} as MetricRangePayloadProps['data']['result'][0],
]);
prepareBarPanelConfig({ ...baseParams, apiResponse });
expect(getInitialStackedBandsMock).not.toHaveBeenCalled();
});
});
});

View File

@@ -303,6 +303,27 @@ describe('TimeSeriesPanel utils', () => {
expect(seriesConfig!.stroke).toBe('#ff0000');
});
it('passes result metric to each series for cross-panel sync', () => {
const metric = { host: 'server1', __name__: 'cpu' };
const apiResponse = createApiResponse([
{
metric,
queryName: 'Q',
values: [
[1000, '1'],
[2000, '2'],
],
} as MetricRangePayloadProps['data']['result'][0],
]);
const config = prepareUPlotConfig({
...baseParams,
apiResponse,
}).getConfig();
expect(config.series?.[1]).toMatchObject({ metric });
});
it('adds multiple series when result has multiple items', () => {
const apiResponse = createApiResponse([
{

View File

@@ -0,0 +1,296 @@
import { renderHook } from '@testing-library/react';
import ROUTES from 'constants/routes';
import { Query } from 'types/api/queryBuilder/queryBuilderData';
import { getViewQuery } from '../drilldownUtils';
import { AggregateData } from '../useAggregateDrilldown';
import useBaseDrilldownNavigate, {
buildDrilldownUrl,
getRoute,
} from '../useBaseDrilldownNavigate';
const mockSafeNavigate = jest.fn();
jest.mock('hooks/useSafeNavigate', () => ({
useSafeNavigate: (): { safeNavigate: typeof mockSafeNavigate } => ({
safeNavigate: mockSafeNavigate,
}),
}));
jest.mock('../drilldownUtils', () => ({
...jest.requireActual('../drilldownUtils'),
getViewQuery: jest.fn(),
}));
const mockGetViewQuery = getViewQuery as jest.Mock;
// ─── Fixtures ────────────────────────────────────────────────────────────────
const MOCK_QUERY: Query = {
id: 'q1',
queryType: 'builder' as any,
builder: {
queryData: [
{
queryName: 'A',
dataSource: 'metrics' as any,
groupBy: [],
expression: '',
disabled: false,
functions: [],
legend: '',
having: [],
limit: null,
stepInterval: undefined,
orderBy: [],
},
],
queryFormulas: [],
queryTraceOperator: [],
},
promql: [],
clickhouse_sql: [],
};
const MOCK_VIEW_QUERY: Query = {
...MOCK_QUERY,
builder: {
...MOCK_QUERY.builder,
queryData: [
{
...MOCK_QUERY.builder.queryData[0],
filters: { items: [], op: 'AND' },
},
],
},
};
const MOCK_AGGREGATE_DATA: AggregateData = {
queryName: 'A',
filters: [{ filterKey: 'service_name', filterValue: 'auth', operator: '=' }],
timeRange: { startTime: 1000000, endTime: 2000000 },
};
// ─── getRoute ─────────────────────────────────────────────────────────────────
describe('getRoute', () => {
it.each([
['view_logs', ROUTES.LOGS_EXPLORER],
['view_metrics', ROUTES.METRICS_EXPLORER],
['view_traces', ROUTES.TRACES_EXPLORER],
])('maps %s to the correct explorer route', (key, expected) => {
expect(getRoute(key)).toBe(expected);
});
it('returns empty string for an unknown key', () => {
expect(getRoute('view_dashboard')).toBe('');
});
});
// ─── buildDrilldownUrl ────────────────────────────────────────────────────────
describe('buildDrilldownUrl', () => {
beforeEach(() => {
mockGetViewQuery.mockReturnValue(MOCK_VIEW_QUERY);
});
afterEach(() => {
jest.clearAllMocks();
});
it('returns null for an unknown drilldown key', () => {
const url = buildDrilldownUrl(
MOCK_QUERY,
MOCK_AGGREGATE_DATA,
'view_dashboard',
);
expect(url).toBeNull();
});
it('returns null when getViewQuery returns null', () => {
mockGetViewQuery.mockReturnValue(null);
const url = buildDrilldownUrl(MOCK_QUERY, MOCK_AGGREGATE_DATA, 'view_logs');
expect(url).toBeNull();
});
it('returns a URL starting with the logs explorer route for view_logs', () => {
const url = buildDrilldownUrl(MOCK_QUERY, MOCK_AGGREGATE_DATA, 'view_logs');
expect(url).not.toBeNull();
expect(url).toContain(ROUTES.LOGS_EXPLORER);
});
it('returns a URL starting with the traces explorer route for view_traces', () => {
const url = buildDrilldownUrl(MOCK_QUERY, MOCK_AGGREGATE_DATA, 'view_traces');
expect(url).toContain(ROUTES.TRACES_EXPLORER);
});
it('includes compositeQuery param in the URL', () => {
const url = buildDrilldownUrl(MOCK_QUERY, MOCK_AGGREGATE_DATA, 'view_logs');
expect(url).toContain('compositeQuery=');
});
it('includes startTime and endTime when aggregateData has a timeRange', () => {
const url = buildDrilldownUrl(MOCK_QUERY, MOCK_AGGREGATE_DATA, 'view_logs');
expect(url).toContain('startTime=1000000');
expect(url).toContain('endTime=2000000');
});
it('omits startTime and endTime when aggregateData has no timeRange', () => {
const { timeRange: _, ...withoutTimeRange } = MOCK_AGGREGATE_DATA;
const url = buildDrilldownUrl(MOCK_QUERY, withoutTimeRange, 'view_logs');
expect(url).not.toContain('startTime=');
expect(url).not.toContain('endTime=');
});
it('includes summaryFilters param for view_metrics', () => {
const url = buildDrilldownUrl(
MOCK_QUERY,
MOCK_AGGREGATE_DATA,
'view_metrics',
);
expect(url).toContain(ROUTES.METRICS_EXPLORER);
expect(url).toContain('summaryFilters=');
});
it('does not include summaryFilters param for non-metrics routes', () => {
const url = buildDrilldownUrl(MOCK_QUERY, MOCK_AGGREGATE_DATA, 'view_logs');
expect(url).not.toContain('summaryFilters=');
});
it('handles null aggregateData by passing empty filters and empty queryName', () => {
const url = buildDrilldownUrl(MOCK_QUERY, null, 'view_logs');
expect(url).not.toBeNull();
expect(mockGetViewQuery).toHaveBeenCalledWith(
MOCK_QUERY,
[],
'view_logs',
'',
);
});
it('passes aggregateData filters and queryName to getViewQuery', () => {
buildDrilldownUrl(MOCK_QUERY, MOCK_AGGREGATE_DATA, 'view_logs');
expect(mockGetViewQuery).toHaveBeenCalledWith(
MOCK_QUERY,
MOCK_AGGREGATE_DATA.filters,
'view_logs',
MOCK_AGGREGATE_DATA.queryName,
);
});
});
// ─── useBaseDrilldownNavigate ─────────────────────────────────────────────────
describe('useBaseDrilldownNavigate', () => {
beforeEach(() => {
mockGetViewQuery.mockReturnValue(MOCK_VIEW_QUERY);
});
afterEach(() => {
jest.clearAllMocks();
});
it('calls safeNavigate with the built URL on a valid key', () => {
const { result } = renderHook(() =>
useBaseDrilldownNavigate({
resolvedQuery: MOCK_QUERY,
aggregateData: MOCK_AGGREGATE_DATA,
}),
);
result.current('view_logs');
expect(mockSafeNavigate).toHaveBeenCalledTimes(1);
const [url] = mockSafeNavigate.mock.calls[0];
expect(url).toContain(ROUTES.LOGS_EXPLORER);
expect(url).toContain('compositeQuery=');
});
it('opens the explorer in a new tab', () => {
const { result } = renderHook(() =>
useBaseDrilldownNavigate({
resolvedQuery: MOCK_QUERY,
aggregateData: MOCK_AGGREGATE_DATA,
}),
);
result.current('view_traces');
expect(mockSafeNavigate).toHaveBeenCalledWith(expect.any(String), {
newTab: true,
});
});
it('calls callback after successful navigation', () => {
const callback = jest.fn();
const { result } = renderHook(() =>
useBaseDrilldownNavigate({
resolvedQuery: MOCK_QUERY,
aggregateData: MOCK_AGGREGATE_DATA,
callback,
}),
);
result.current('view_logs');
expect(callback).toHaveBeenCalledTimes(1);
});
it('does not call safeNavigate for an unknown key', () => {
const { result } = renderHook(() =>
useBaseDrilldownNavigate({
resolvedQuery: MOCK_QUERY,
aggregateData: MOCK_AGGREGATE_DATA,
}),
);
result.current('view_dashboard');
expect(mockSafeNavigate).not.toHaveBeenCalled();
});
it('still calls callback when the key is unknown', () => {
const callback = jest.fn();
const { result } = renderHook(() =>
useBaseDrilldownNavigate({
resolvedQuery: MOCK_QUERY,
aggregateData: MOCK_AGGREGATE_DATA,
callback,
}),
);
result.current('view_dashboard');
expect(callback).toHaveBeenCalledTimes(1);
expect(mockSafeNavigate).not.toHaveBeenCalled();
});
it('still calls callback when getViewQuery returns null', () => {
mockGetViewQuery.mockReturnValue(null);
const callback = jest.fn();
const { result } = renderHook(() =>
useBaseDrilldownNavigate({
resolvedQuery: MOCK_QUERY,
aggregateData: MOCK_AGGREGATE_DATA,
callback,
}),
);
result.current('view_logs');
expect(callback).toHaveBeenCalledTimes(1);
expect(mockSafeNavigate).not.toHaveBeenCalled();
});
it('handles null aggregateData without throwing', () => {
const { result } = renderHook(() =>
useBaseDrilldownNavigate({
resolvedQuery: MOCK_QUERY,
aggregateData: null,
}),
);
expect(() => result.current('view_logs')).not.toThrow();
expect(mockSafeNavigate).toHaveBeenCalledTimes(1);
});
});

View File

@@ -168,7 +168,7 @@ export const getAggregateColumnHeader = (
};
};
const getFiltersFromMetric = (metric: any): FilterData[] =>
export const getFiltersFromMetric = (metric: any): FilterData[] =>
Object.keys(metric).map((key) => ({
filterKey: key,
filterValue: metric[key],

View File

@@ -2,14 +2,10 @@ import { useCallback, useEffect, useMemo, useState } from 'react';
import { useLocation } from 'react-router-dom';
import { Link, Loader } from '@signozhq/icons';
import OverlayScrollbar from 'components/OverlayScrollbar/OverlayScrollbar';
import { QueryParams } from 'constants/query';
import { PANEL_TYPES } from 'constants/queryBuilder';
import ROUTES from 'constants/routes';
import useUpdatedQuery from 'container/GridCardLayout/useResolveQuery';
import { processContextLinks } from 'container/NewWidget/RightContainer/ContextLinks/utils';
import useContextVariables from 'hooks/dashboard/useContextVariables';
import { useSafeNavigate } from 'hooks/useSafeNavigate';
import createQueryParams from 'lib/createQueryParams';
import ContextMenu from 'periscope/components/ContextMenu';
import { useDashboardStore } from 'providers/Dashboard/store/useDashboardStore';
import { ContextLinksData } from 'types/api/dashboard/getAll';
@@ -18,9 +14,10 @@ import { openInNewTab } from 'utils/navigation';
import { ContextMenuItem } from './contextConfig';
import { getDataLinks } from './dataLinksUtils';
import { getAggregateColumnHeader, getViewQuery } from './drilldownUtils';
import { getAggregateColumnHeader } from './drilldownUtils';
import { getBaseContextConfig } from './menuOptions';
import { AggregateData } from './useAggregateDrilldown';
import useBaseDrilldownNavigate from './useBaseDrilldownNavigate';
interface UseBaseAggregateOptionsProps {
query: Query;
@@ -38,19 +35,6 @@ interface BaseAggregateOptionsConfig {
items?: ContextMenuItem;
}
const getRoute = (key: string): string => {
switch (key) {
case 'view_logs':
return ROUTES.LOGS_EXPLORER;
case 'view_metrics':
return ROUTES.METRICS_EXPLORER;
case 'view_traces':
return ROUTES.TRACES_EXPLORER;
default:
return '';
}
};
const useBaseAggregateOptions = ({
query,
onClose,
@@ -86,8 +70,6 @@ const useBaseAggregateOptions = ({
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [query, aggregateData, panelType]);
const { safeNavigate } = useSafeNavigate();
// Use the new useContextVariables hook
const { processedVariables } = useContextVariables({
maxValues: 2,
@@ -121,50 +103,16 @@ const useBaseAggregateOptions = ({
{label}
</ContextMenu.Item>
));
} catch (error) {
} catch {
return [];
}
}, [contextLinks, processedVariables, onClose, aggregateData, query]);
const handleBaseDrilldown = useCallback(
(key: string): void => {
const route = getRoute(key);
const timeRange = aggregateData?.timeRange;
const filtersToAdd = aggregateData?.filters || [];
const viewQuery = getViewQuery(
resolvedQuery,
filtersToAdd,
key,
aggregateData?.queryName || '',
);
let queryParams = {
[QueryParams.compositeQuery]: encodeURIComponent(JSON.stringify(viewQuery)),
...(timeRange && {
[QueryParams.startTime]: timeRange?.startTime.toString(),
[QueryParams.endTime]: timeRange?.endTime.toString(),
}),
} as Record<string, string>;
if (route === ROUTES.METRICS_EXPLORER) {
queryParams = {
...queryParams,
[QueryParams.summaryFilters]: JSON.stringify(
viewQuery?.builder.queryData[0].filters,
),
};
}
if (route) {
safeNavigate(`${route}?${createQueryParams(queryParams)}`, {
newTab: true,
});
}
onClose();
},
[resolvedQuery, safeNavigate, onClose, aggregateData],
);
const handleBaseDrilldown = useBaseDrilldownNavigate({
resolvedQuery,
aggregateData,
callback: onClose,
});
const { pathname } = useLocation();

View File

@@ -0,0 +1,117 @@
import { useCallback } from 'react';
import { QueryParams } from 'constants/query';
import ROUTES from 'constants/routes';
import { useSafeNavigate } from 'hooks/useSafeNavigate';
import createQueryParams from 'lib/createQueryParams';
import { Query } from 'types/api/queryBuilder/queryBuilderData';
import { getViewQuery } from './drilldownUtils';
import { AggregateData } from './useAggregateDrilldown';
type DrilldownKey = 'view_logs' | 'view_metrics' | 'view_traces';
const DRILLDOWN_ROUTE_MAP: Record<DrilldownKey, string> = {
view_logs: ROUTES.LOGS_EXPLORER,
view_metrics: ROUTES.METRICS_EXPLORER,
view_traces: ROUTES.TRACES_EXPLORER,
};
const getRoute = (key: string): string =>
DRILLDOWN_ROUTE_MAP[key as DrilldownKey] ?? '';
interface UseBaseDrilldownNavigateProps {
resolvedQuery: Query;
aggregateData: AggregateData | null;
callback?: () => void;
}
const useBaseDrilldownNavigate = ({
resolvedQuery,
aggregateData,
callback,
}: UseBaseDrilldownNavigateProps): ((key: string) => void) => {
const { safeNavigate } = useSafeNavigate();
return useCallback(
(key: string): void => {
const route = getRoute(key);
const viewQuery = getViewQuery(
resolvedQuery,
aggregateData?.filters ?? [],
key,
aggregateData?.queryName ?? '',
);
if (!viewQuery || !route) {
callback?.();
return;
}
const timeRange = aggregateData?.timeRange;
let queryParams: Record<string, string> = {
[QueryParams.compositeQuery]: encodeURIComponent(JSON.stringify(viewQuery)),
...(timeRange && {
[QueryParams.startTime]: timeRange.startTime.toString(),
[QueryParams.endTime]: timeRange.endTime.toString(),
}),
};
if (route === ROUTES.METRICS_EXPLORER) {
queryParams = {
...queryParams,
[QueryParams.summaryFilters]: JSON.stringify(
viewQuery.builder.queryData[0].filters,
),
};
}
safeNavigate(`${route}?${createQueryParams(queryParams)}`, {
newTab: true,
});
callback?.();
},
[resolvedQuery, safeNavigate, callback, aggregateData],
);
};
export function buildDrilldownUrl(
resolvedQuery: Query,
aggregateData: AggregateData | null,
key: string,
): string | null {
const route = getRoute(key);
const viewQuery = getViewQuery(
resolvedQuery,
aggregateData?.filters ?? [],
key,
aggregateData?.queryName ?? '',
);
if (!viewQuery || !route) {
return null;
}
const timeRange = aggregateData?.timeRange;
let queryParams: Record<string, string> = {
[QueryParams.compositeQuery]: encodeURIComponent(JSON.stringify(viewQuery)),
...(timeRange && {
[QueryParams.startTime]: timeRange.startTime.toString(),
[QueryParams.endTime]: timeRange.endTime.toString(),
}),
};
if (route === ROUTES.METRICS_EXPLORER) {
queryParams = {
...queryParams,
[QueryParams.summaryFilters]: JSON.stringify(
viewQuery.builder.queryData[0].filters,
),
};
}
return `${route}?${createQueryParams(queryParams)}`;
}
export { getRoute };
export default useBaseDrilldownNavigate;

View File

@@ -0,0 +1,29 @@
import { syncCursorRegistry } from '../syncCursorRegistry';
describe('syncCursorRegistry', () => {
describe('metadata', () => {
it('returns undefined for unknown key', () => {
expect(syncCursorRegistry.getMetadata('unknown-meta')).toBeUndefined();
});
it('stores and retrieves metadata by syncKey', () => {
const metadata = { yAxisUnit: 'ms', groupBy: [] };
syncCursorRegistry.setMetadata('meta-key', metadata);
expect(syncCursorRegistry.getMetadata('meta-key')).toBe(metadata);
});
});
describe('activeSeriesMetric', () => {
it('returns null (not undefined) for unknown key', () => {
expect(
syncCursorRegistry.getActiveSeriesMetric('unknown-metric'),
).toBeNull();
});
it('stores and retrieves metric by syncKey', () => {
const metric = { host: 'server1', __name__: 'cpu' };
syncCursorRegistry.setActiveSeriesMetric('metric-key', metric);
expect(syncCursorRegistry.getActiveSeriesMetric('metric-key')).toBe(metric);
});
});
});

View File

@@ -0,0 +1,627 @@
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
import uPlot from 'uplot';
import { syncCursorRegistry } from '../syncCursorRegistry';
import { createSyncDisplayHook } from '../syncDisplayHook';
import {
SyncTooltipFilterMode,
type TooltipControllerState,
type TooltipSyncMetadata,
} from '../types';
jest.mock('../syncCursorRegistry', () => ({
syncCursorRegistry: {
setMetadata: jest.fn(),
getMetadata: jest.fn(),
setActiveSeriesMetric: jest.fn(),
getActiveSeriesMetric: jest.fn(),
},
}));
const mockRegistry = syncCursorRegistry as {
setMetadata: jest.Mock;
getMetadata: jest.Mock;
setActiveSeriesMetric: jest.Mock;
getActiveSeriesMetric: jest.Mock;
};
const SYNC_KEY = 'test-sync-key';
// Builds a single-query groupByPerQuery from a list of dimension keys.
const makeGroupByPerQuery = (
...keys: string[]
): Record<string, BaseAutocompleteData[]> => ({
A: keys.map((key) => ({ key, type: 'tag' as const })),
});
function makeUPlotRoot(includeCrosshair = true): HTMLElement {
const root = document.createElement('div');
if (includeCrosshair) {
const el = document.createElement('div');
el.className = 'u-cursor-y';
root.append(el);
}
return root;
}
type FakeSeries = { metric?: Record<string, string>; show?: boolean };
function makeFakeUPlot(opts: {
cursorEvent?: MouseEvent | null;
cursorLeft?: number;
series?: FakeSeries[];
includeCrosshair?: boolean;
}): uPlot {
return {
root: makeUPlotRoot(opts.includeCrosshair ?? true),
cursor: {
event: opts.cursorEvent !== undefined ? opts.cursorEvent : null,
left: opts.cursorLeft ?? 50,
},
series: opts.series ?? [
{},
{ metric: { host: 'server1' } },
{ metric: { host: 'server2' } },
],
setSeries: jest.fn(),
} as unknown as uPlot;
}
function makeController(
focusedSeriesIndex: number | null = null,
): TooltipControllerState {
return {
focusedSeriesIndex,
syncedSeriesIndexes: null,
} as TooltipControllerState;
}
// Convenience cast used throughout assertions.
function mockSetSeries(u: uPlot): jest.Mock {
return (u as unknown as { setSeries: jest.Mock }).setSeries;
}
function getCrosshair(u: uPlot): HTMLElement {
const el = u.root.querySelector<HTMLElement>('.u-cursor-y');
if (!el) {
throw new Error('crosshair element missing');
}
return el;
}
// ─────────────────────────────────────────────────────────────────────────────
describe('createSyncDisplayHook', () => {
beforeEach(() => {
jest.clearAllMocks();
});
// ── guard ────────────────────────────────────────────────────────────────
describe('no crosshair element', () => {
it('returns early without calling registry when .u-cursor-y absent', () => {
const hook = createSyncDisplayHook(SYNC_KEY, undefined, makeController());
const u = makeFakeUPlot({ includeCrosshair: false });
hook(u);
expect(mockRegistry.setMetadata).not.toHaveBeenCalled();
expect(mockRegistry.getMetadata).not.toHaveBeenCalled();
expect(mockSetSeries(u)).not.toHaveBeenCalled();
});
});
// ── source panel ─────────────────────────────────────────────────────────
describe('source behavior (cursor.event != null)', () => {
it('writes syncMetadata to registry', () => {
const syncMetadata: TooltipSyncMetadata = { yAxisUnit: 'ms' };
const hook = createSyncDisplayHook(SYNC_KEY, syncMetadata, makeController());
const u = makeFakeUPlot({ cursorEvent: new MouseEvent('mousemove') });
hook(u);
expect(mockRegistry.setMetadata).toHaveBeenCalledWith(
SYNC_KEY,
syncMetadata,
);
});
it('writes focused series metric when focusedSeriesIndex is set', () => {
const series: FakeSeries[] = [
{},
{ metric: { host: 'server1' } },
{ metric: { host: 'server2' } },
];
const hook = createSyncDisplayHook(SYNC_KEY, undefined, makeController(1));
const u = makeFakeUPlot({
cursorEvent: new MouseEvent('mousemove'),
series,
});
hook(u);
expect(mockRegistry.setActiveSeriesMetric).toHaveBeenCalledWith(SYNC_KEY, {
host: 'server1',
});
});
it('writes null metric when focusedSeriesIndex is null', () => {
const hook = createSyncDisplayHook(
SYNC_KEY,
undefined,
makeController(null),
);
const u = makeFakeUPlot({ cursorEvent: new MouseEvent('mousemove') });
hook(u);
expect(mockRegistry.setActiveSeriesMetric).toHaveBeenCalledWith(
SYNC_KEY,
null,
);
});
it('clears controller.syncedSeriesIndexes', () => {
const controller = makeController();
controller.syncedSeriesIndexes = [1, 2];
const hook = createSyncDisplayHook(SYNC_KEY, undefined, controller);
const u = makeFakeUPlot({ cursorEvent: new MouseEvent('mousemove') });
hook(u);
expect(controller.syncedSeriesIndexes).toBeNull();
});
it('shows crosshair and does not read from registry', () => {
const hook = createSyncDisplayHook(SYNC_KEY, undefined, makeController());
const u = makeFakeUPlot({ cursorEvent: new MouseEvent('mousemove') });
hook(u);
expect(getCrosshair(u).style.display).toBe('');
expect(mockRegistry.getMetadata).not.toHaveBeenCalled();
});
});
// ── receiver panel ───────────────────────────────────────────────────────
describe('receiver behavior (cursor.event is null)', () => {
describe('crosshair visibility', () => {
it('shows crosshair when yAxisUnit matches source', () => {
mockRegistry.getMetadata.mockReturnValue({ yAxisUnit: 'ms' });
mockRegistry.getActiveSeriesMetric.mockReturnValue(null);
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms' },
makeController(),
);
const u = makeFakeUPlot({ cursorEvent: null });
hook(u);
expect(getCrosshair(u).style.display).toBe('');
});
it('hides crosshair when yAxisUnit differs from source', () => {
mockRegistry.getMetadata.mockReturnValue({ yAxisUnit: 'bytes' });
mockRegistry.getActiveSeriesMetric.mockReturnValue(null);
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms' },
makeController(),
);
const u = makeFakeUPlot({ cursorEvent: null });
hook(u);
expect(getCrosshair(u).style.display).toBe('none');
});
});
// ── exact groupBy match ───────────────────────────────────────────────
describe('exact groupBy match', () => {
const groupByPerQuery = makeGroupByPerQuery('host');
const series: FakeSeries[] = [
{},
{ metric: { host: 'server1' } },
{ metric: { host: 'server2' } },
];
it('focuses the matching series and records it on the controller', () => {
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server2' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: 50, series });
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(2, { focus: true });
expect(controller.syncedSeriesIndexes).toStrictEqual([2]);
});
it('unfocuses all and emits empty matches (Filtered) when active metric is null', () => {
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue(null);
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: 50, series });
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(null, { focus: false });
expect(controller.syncedSeriesIndexes).toStrictEqual([]);
});
it('unfocuses all when metric matches no series', () => {
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({
host: 'unknown-server',
});
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: 50, series });
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(null, { focus: false });
expect(controller.syncedSeriesIndexes).toStrictEqual([]);
});
it('clears syncedSeriesIndexes when cursor is off-plot (left < 0)', () => {
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server1' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: -1, series });
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(null, { focus: false });
expect(controller.syncedSeriesIndexes).toBeNull();
expect(mockRegistry.getActiveSeriesMetric).not.toHaveBeenCalled();
});
it('never focuses series at index 0 (x-axis)', () => {
const sameMetric = { host: 'server1' };
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue(sameMetric);
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery },
controller,
);
const u = makeFakeUPlot({
cursorEvent: null,
cursorLeft: 50,
// Index 0 has the same metric — it must always be skipped.
series: [{ metric: sameMetric }, { metric: { host: 'other' } }],
});
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(null, { focus: false });
expect(controller.syncedSeriesIndexes).toStrictEqual([]);
});
it('skips hidden series (show === false)', () => {
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server1' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery },
controller,
);
const u = makeFakeUPlot({
cursorEvent: null,
cursorLeft: 50,
series: [
{},
{ metric: { host: 'server1' }, show: false },
{ metric: { host: 'server1' } },
],
});
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(2, { focus: true });
expect(controller.syncedSeriesIndexes).toStrictEqual([2]);
});
});
// ── partial groupBy overlap ───────────────────────────────────────────
describe('partial groupBy overlap', () => {
it('subset — records every receiver series matching on the common key', () => {
// Source groupBy=[host], receiver groupBy=[host, service].
// Hook focuses the first match; the rest are surfaced via controller.syncedSeriesIndexes.
const sourceGroupBy = makeGroupByPerQuery('host');
const receiverGroupBy = makeGroupByPerQuery('host', 'service');
const series: FakeSeries[] = [
{},
{ metric: { host: 'server1', service: 'api' } },
{ metric: { host: 'server1', service: 'frontend' } },
{ metric: { host: 'server2', service: 'api' } },
];
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery: sourceGroupBy,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server1' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery: receiverGroupBy },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: 50, series });
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(1, { focus: true });
expect(controller.syncedSeriesIndexes).toStrictEqual([1, 2]);
});
it('superset — records the one receiver series matching on the common key', () => {
// Source groupBy=[host, service], receiver groupBy=[host].
const sourceGroupBy = makeGroupByPerQuery('host', 'service');
const receiverGroupBy = makeGroupByPerQuery('host');
const series: FakeSeries[] = [
{},
{ metric: { host: 'server1' } },
{ metric: { host: 'server2' } },
];
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery: sourceGroupBy,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({
host: 'server1',
service: 'api',
});
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery: receiverGroupBy },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: 50, series });
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(1, { focus: true });
expect(controller.syncedSeriesIndexes).toStrictEqual([1]);
});
it('partial — matches on the intersecting key only', () => {
// Source groupBy=[host, service], receiver groupBy=[service, region].
// Common key is [service]. Both receiver series with service=api match.
const sourceGroupBy = makeGroupByPerQuery('host', 'service');
const receiverGroupBy = makeGroupByPerQuery('service', 'region');
const series: FakeSeries[] = [
{},
{ metric: { service: 'api', region: 'us-east' } },
{ metric: { service: 'api', region: 'eu-west' } },
{ metric: { service: 'frontend', region: 'us-east' } },
];
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery: sourceGroupBy,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({
host: 'server1',
service: 'api',
});
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery: receiverGroupBy },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: 50, series });
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(1, { focus: true });
expect(controller.syncedSeriesIndexes).toStrictEqual([1, 2]);
});
});
// ── union across queries in groupByPerQuery ───────────────────────────
describe('union across queries', () => {
it("treats the panel's effective groupBy as the union across its queries", () => {
// Source has query A=[host]; receiver has A=[host], B=[service].
// The shared key is `host` — receiver matches on that.
const sourceGroupBy: Record<string, BaseAutocompleteData[]> = {
A: [{ key: 'host', type: 'tag' }],
};
const receiverGroupBy: Record<string, BaseAutocompleteData[]> = {
A: [{ key: 'host', type: 'tag' }],
B: [{ key: 'service', type: 'tag' }],
};
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery: sourceGroupBy,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server1' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery: receiverGroupBy },
controller,
);
const u = makeFakeUPlot({
cursorEvent: null,
cursorLeft: 50,
series: [
{},
{ metric: { host: 'server1' } },
{ metric: { host: 'server2' } },
],
});
hook(u);
expect(controller.syncedSeriesIndexes).toStrictEqual([1]);
});
});
// ── no overlap (Filtered mode default) ────────────────────────────────
describe('no overlap → Filtered mode emits []', () => {
it('emits [] when groupBy keys are completely different', () => {
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery: makeGroupByPerQuery('host'),
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server1' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery: makeGroupByPerQuery('service') },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null });
hook(u);
expect(controller.syncedSeriesIndexes).toStrictEqual([]);
});
it('emits [] when receiver groupBy is empty', () => {
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery: makeGroupByPerQuery('host'),
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server1' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery: {} },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null });
hook(u);
expect(controller.syncedSeriesIndexes).toStrictEqual([]);
});
it('emits [] when source groupBy is absent', () => {
mockRegistry.getMetadata.mockReturnValue({ yAxisUnit: 'ms' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms', groupByPerQuery: makeGroupByPerQuery('host') },
controller,
);
const u = makeFakeUPlot({ cursorEvent: null });
hook(u);
expect(controller.syncedSeriesIndexes).toStrictEqual([]);
});
});
// ── filterMode: All ──────────────────────────────────────────────────
describe('filterMode All', () => {
it('emits null (no filter) when there is no overlap in groupBy', () => {
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery: makeGroupByPerQuery('host'),
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server1' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{
yAxisUnit: 'ms',
groupByPerQuery: makeGroupByPerQuery('service'),
filterMode: SyncTooltipFilterMode.All,
},
controller,
);
const u = makeFakeUPlot({ cursorEvent: null });
hook(u);
expect(controller.syncedSeriesIndexes).toBeNull();
});
it('emits null when metric matches no series', () => {
const groupByPerQuery = makeGroupByPerQuery('host');
mockRegistry.getMetadata.mockReturnValue({
yAxisUnit: 'ms',
groupByPerQuery,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'unknown' });
const controller = makeController();
const hook = createSyncDisplayHook(
SYNC_KEY,
{
yAxisUnit: 'ms',
groupByPerQuery,
filterMode: SyncTooltipFilterMode.All,
},
controller,
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: 50 });
hook(u);
expect(controller.syncedSeriesIndexes).toBeNull();
});
});
// ── caching ───────────────────────────────────────────────────────────
describe('caching optimizations', () => {
it('reuses the crosshair element across multiple invocations', () => {
mockRegistry.getMetadata.mockReturnValue({ yAxisUnit: 'ms' });
mockRegistry.getActiveSeriesMetric.mockReturnValue(null);
const hook = createSyncDisplayHook(
SYNC_KEY,
{ yAxisUnit: 'ms' },
makeController(),
);
const u = makeFakeUPlot({ cursorEvent: null });
const spy = jest.spyOn(u.root, 'querySelector');
hook(u);
hook(u);
hook(u);
// querySelector should only be called once regardless of invocation count.
expect(spy).toHaveBeenCalledTimes(1);
});
it('recomputes common keys when source groupByPerQuery reference changes', () => {
const hostGroupBy = makeGroupByPerQuery('host');
const serviceGroupBy = makeGroupByPerQuery('service');
const series: FakeSeries[] = [
{},
{ metric: { host: 'server1', service: 'api' } },
{ metric: { host: 'server2', service: 'frontend' } },
];
const hook = createSyncDisplayHook(
SYNC_KEY,
{ groupByPerQuery: makeGroupByPerQuery('host', 'service') },
makeController(),
);
const u = makeFakeUPlot({ cursorEvent: null, cursorLeft: 50, series });
// First call: source groups by host → matches series 1.
mockRegistry.getMetadata.mockReturnValue({
groupByPerQuery: hostGroupBy,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({ host: 'server1' });
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(1, { focus: true });
jest.clearAllMocks();
// Second call: source now groups by service → matches series 2.
mockRegistry.getMetadata.mockReturnValue({
groupByPerQuery: serviceGroupBy,
});
mockRegistry.getActiveSeriesMetric.mockReturnValue({
service: 'frontend',
});
hook(u);
expect(mockSetSeries(u)).toHaveBeenCalledWith(2, { focus: true });
});
});
});
});

View File

@@ -29,5 +29,42 @@ func (provider *provider) addTraceDetailRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v4/traces/{traceID}/waterfall", handler.New(
provider.authzMiddleware.ViewAccess(provider.traceDetailHandler.GetWaterfallV4),
handler.OpenAPIDef{
ID: "GetWaterfallV4",
Tags: []string{"tracedetail"},
Summary: "Get waterfall view for a trace",
Description: "Returns the waterfall view of spans including all spans if total spans are under a limit, a max count otherwise. Aggregations are dropped compared to v3",
Request: new(spantypes.PostableWaterfall),
RequestContentType: "application/json",
Response: new(spantypes.GettableWaterfallTrace),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v3/traces/{traceID}/flamegraph", handler.New(
provider.authzMiddleware.ViewAccess(provider.traceDetailHandler.GetFlamegraph),
handler.OpenAPIDef{
ID: "GetFlamegraph",
Tags: []string{"tracedetail"},
Summary: "Get flamegraph view for a trace",
Description: "Returns the flamegraph view of spans for a given trace ID.",
Request: new(spantypes.PostableFlamegraph),
RequestContentType: "application/json",
Response: new(spantypes.GettableFlamegraphTrace),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -6,7 +6,16 @@ import (
)
type Config struct {
Waterfall WaterfallConfig `mapstructure:"waterfall"`
Waterfall WaterfallConfig `mapstructure:"waterfall"`
Flamegraph FlamegraphConfig `mapstructure:"flamegraph"`
}
type FlamegraphConfig struct {
MaxSelectedLevels int `mapstructure:"max_selected_levels"`
MaxSpansPerLevel int `mapstructure:"max_spans_per_level"`
SamplingTopLatencySpansCount int `mapstructure:"sampling_top_latency_count"`
SamplingBucketCount int `mapstructure:"sampling_bucket_count"`
SelectAllSpansLimit uint `mapstructure:"select_all_spans_limit"`
}
type WaterfallConfig struct {
@@ -29,6 +38,13 @@ func newConfig() factory.Config {
MaxDepthToAutoExpand: 5,
MaxLimitToSelectAllSpans: 10_000,
},
Flamegraph: FlamegraphConfig{
MaxSelectedLevels: 50,
MaxSpansPerLevel: 100,
SamplingTopLatencySpansCount: 5,
SamplingBucketCount: 50,
SelectAllSpansLimit: 100_000,
},
}
}
@@ -45,5 +61,25 @@ func (c Config) Validate() error {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.waterfall.max_limit_to_select_all_spans must be positive")
}
if c.Flamegraph.MaxSelectedLevels <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.level_limit must be positive, got %d", c.Flamegraph.MaxSelectedLevels)
}
if c.Flamegraph.MaxSpansPerLevel <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.spans_per_level must be positive, got %d", c.Flamegraph.MaxSpansPerLevel)
}
if c.Flamegraph.SamplingTopLatencySpansCount < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.top_latency_count cannot be negative, got %d", c.Flamegraph.SamplingTopLatencySpansCount)
}
if c.Flamegraph.SamplingBucketCount <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.bucket_count must be positive, got %d", c.Flamegraph.SamplingBucketCount)
}
if c.Flamegraph.SelectAllSpansLimit == 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.max_limit_to_select_all_spans must be positive")
}
return nil
}

View File

@@ -38,3 +38,40 @@ func (h *handler) GetWaterfall(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusOK, result)
}
func (h *handler) GetWaterfallV4(rw http.ResponseWriter, r *http.Request) {
req := new(spantypes.PostableWaterfall)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
if err := req.Validate(); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.GetWaterfallV4(r.Context(), mux.Vars(r)["traceID"], req.SelectedSpanID, req.UncollapsedSpans, req.Limit)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}
func (h *handler) GetFlamegraph(rw http.ResponseWriter, r *http.Request) {
req := new(spantypes.PostableFlamegraph)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.GetFlamegraph(r.Context(), mux.Vars(r)["traceID"], req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}

View File

@@ -2,6 +2,7 @@ package impltracedetail
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
@@ -45,7 +46,7 @@ func (m *module) GetWaterfall(ctx context.Context, traceID string, req *spantype
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, uncollapsedSpans, selectedAllSpans, aggregationResults), nil
}
// getTraceData returns the waterfall cache for the given traceID with fallback on DB.
// getTraceData fetches all spans for a trace and builds the WaterfallTrace.
func (m *module) getTraceData(ctx context.Context, traceID string) (*spantypes.WaterfallTrace, error) {
summary, err := m.store.GetTraceSummary(ctx, traceID)
if err != nil {
@@ -61,6 +62,144 @@ func (m *module) getTraceData(ctx context.Context, traceID string) (*spantypes.W
return nil, spantypes.ErrTraceNotFound
}
traceData := spantypes.NewWaterfallTraceFromSpans(spanItems)
return traceData, nil
nodes := make([]*spantypes.WaterfallSpan, len(spanItems))
for i := range spanItems {
nodes[i] = spanItems[i].ToWaterfallSpan(traceID)
}
return spantypes.NewWaterfallTraceFromSpans(nodes), nil
}
// GetWaterfallV4 is the OOM-safe V4 waterfall.
// For large traces (NumSpans > effectiveLimit) it uses a two-step fetch:
// minimal fields for all spans to build the tree, then full fields for the
// visible window only. Aggregations are not returned.
func (m *module) GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error) {
summary, err := m.store.GetTraceSummary(ctx, traceID)
if err != nil {
return nil, err
}
effectiveLimit := min(selectAllLimit, m.config.Waterfall.MaxLimitToSelectAllSpans)
if summary.NumSpans > uint64(effectiveLimit) {
return m.getWindowedWaterfall(ctx, traceID, selectedSpanID, uncollapsedSpans, summary.Start, summary.End)
}
return m.getFullWaterfall(ctx, traceID, summary)
}
func (m *module) getFullWaterfall(ctx context.Context, traceID string, summary *spantypes.TraceSummary) (*spantypes.GettableWaterfallTrace, error) {
spanItems, err := m.store.GetTraceSpans(ctx, traceID, summary)
if err != nil {
return nil, err
}
if len(spanItems) == 0 {
return nil, spantypes.ErrTraceNotFound
}
nodes := make([]*spantypes.WaterfallSpan, len(spanItems))
for i := range spanItems {
nodes[i] = spanItems[i].ToWaterfallSpan(traceID)
}
waterfallTrace := spantypes.NewWaterfallTraceFromSpans(nodes)
selectedSpans := waterfallTrace.GetAllSpans()
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, nil, true, nil), nil
}
func (m *module) GetFlamegraph(ctx context.Context, traceID string, req *spantypes.PostableFlamegraph) (*spantypes.GettableFlamegraphTrace, error) {
summary, err := m.store.GetTraceSummary(ctx, traceID)
if err != nil {
return nil, err
}
if summary.NumSpans <= uint64(m.config.Flamegraph.SelectAllSpansLimit) {
return m.getFullFlamegraph(ctx, traceID, summary)
}
return m.getWindowedFlamegraph(ctx, traceID, req.SelectedSpanID, summary)
}
func (m *module) getFullFlamegraph(ctx context.Context, traceID string, summary *spantypes.TraceSummary) (*spantypes.GettableFlamegraphTrace, error) {
fullSpans, err := m.store.GetTraceSpans(ctx, traceID, summary)
if err != nil {
return nil, err
}
if len(fullSpans) == 0 {
return nil, spantypes.ErrTraceNotFound
}
flamegraphTrace := spantypes.NewFlamegraphTraceFromStorable(fullSpans)
return spantypes.NewGettableFlamegraphTrace(
flamegraphTrace.GetAllLevels(),
summary.Start.UnixMilli(), summary.End.UnixMilli(), false,
), nil
}
// getWindowedFlamegraph returns a window of a max levels and max sampled spans per level around the selected span
func (m *module) getWindowedFlamegraph(ctx context.Context, traceID, selectedSpanID string, summary *spantypes.TraceSummary) (*spantypes.GettableFlamegraphTrace, error) {
minimalSpans, err := m.store.GetMinimalSpans(ctx, traceID, summary.Start, summary.End)
if err != nil {
return nil, err
}
if len(minimalSpans) == 0 {
return nil, spantypes.ErrTraceNotFound
}
flamegraphTrace := spantypes.NewFlamegraphTraceFromMinimal(minimalSpans)
minimalSpans = nil
cfg := m.config.Flamegraph
selectedSpans := flamegraphTrace.GetSelectedLevels(selectedSpanID,
cfg.MaxSelectedLevels, cfg.MaxSpansPerLevel, cfg.SamplingTopLatencySpansCount, cfg.SamplingBucketCount)
if len(selectedSpans) == 0 {
return nil, spantypes.ErrTraceNotFound
}
fullSpans, err := m.store.GetTraceSpansByIDs(ctx, traceID, summary.Start, summary.End,
spantypes.FlamegraphWindowSpanIDs(selectedSpans))
if err != nil {
return nil, err
}
return spantypes.NewGettableFlamegraphTrace(
flamegraphTrace.EnrichSelectedSpans(selectedSpans, fullSpans),
summary.Start.UnixMilli(), summary.End.UnixMilli(), true,
), nil
}
// getWindowedWaterfall builds the waterfall tree with minimal data and then returns only a window of full spans.
func (m *module) getWindowedWaterfall(ctx context.Context, traceID, selectedSpanID string, uncollapsedSpans []string, start, end time.Time) (*spantypes.GettableWaterfallTrace, error) {
// Step 1: minimal fetch → build full tree → select visible window
minimalSpans, err := m.store.GetMinimalSpans(ctx, traceID, start, end)
if err != nil {
return nil, err
}
if len(minimalSpans) == 0 {
return nil, spantypes.ErrTraceNotFound
}
nodes := make([]*spantypes.WaterfallSpan, len(minimalSpans))
for i := range minimalSpans {
nodes[i] = minimalSpans[i].ToWaterfallSpan(traceID)
}
waterfallTrace := spantypes.NewWaterfallTraceFromSpans(nodes)
selectedSpans, uncollapsedSpans := waterfallTrace.GetSelectedSpans(
uncollapsedSpans,
selectedSpanID,
m.config.Waterfall.SpanPageSize,
m.config.Waterfall.MaxDepthToAutoExpand,
)
// Step 2: full fetch for the selected window only
spanIDs := make([]string, len(selectedSpans))
for i, s := range selectedSpans {
spanIDs[i] = s.SpanID
}
fullSpans, err := m.store.GetTraceSpansByIDs(ctx, traceID, start, end, spanIDs)
if err != nil {
return nil, err
}
spantypes.EnrichSelectedSpans(selectedSpans, fullSpans)
return spantypes.NewGettableWaterfallTrace(
waterfallTrace, selectedSpans, uncollapsedSpans, false, nil,
), nil
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"time"
sqlbuilder "github.com/huandu/go-sqlbuilder"
@@ -12,6 +13,8 @@ import (
"github.com/SigNoz/signoz/pkg/types/spantypes"
)
const colServiceName = `resource_string_service$$$$name` // $ gets escaped so $$$$ converts to $$.
type traceStore struct {
telemetryStore telemetrystore.TelemetryStore
}
@@ -45,8 +48,8 @@ func (s *traceStore) GetTraceSpans(ctx context.Context, traceID string, summary
// DISTINCT ON (span_id) is ClickHouse-specific syntax not supported by sqlbuilder
query := fmt.Sprintf(`
SELECT DISTINCT ON (span_id)
timestamp, duration_nano, span_id, trace_id, has_error, kind,
resource_string_service$$name, name, links as references,
timestamp, duration_nano, span_id, has_error, kind,
resource_string_service$$name, name,
attributes_string, attributes_number, attributes_bool, resources_string,
events, status_message, status_code_string, kind_string, parent_span_id,
flags, is_remote, trace_state, status_code,
@@ -69,3 +72,64 @@ func (s *traceStore) GetTraceSpans(ctx context.Context, traceID string, summary
}
return spanItems, nil
}
func (s *traceStore) GetMinimalSpans(ctx context.Context, traceID string, start, end time.Time) ([]spantypes.MinimalSpan, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select(
"DISTINCT ON (span_id) span_id",
"parent_span_id", "timestamp", "duration_nano", "has_error",
colServiceName,
)
sb.From(fmt.Sprintf("%s.%s", spantypes.TraceDB, spantypes.TraceTable))
sb.Where(
sb.E("trace_id", traceID),
sb.GE("ts_bucket_start", start.Unix()-1800),
sb.LE("ts_bucket_start", end.Unix()),
)
sb.OrderByAsc("timestamp")
sb.OrderByAsc("name")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var spans []spantypes.MinimalSpan
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &spans, query, args...); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying minimal spans")
}
return spans, nil
}
func (s *traceStore) GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]spantypes.StorableSpan, error) {
if len(spanIDs) == 0 {
return []spantypes.StorableSpan{}, nil
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select(
"DISTINCT ON (span_id) timestamp",
"duration_nano", "span_id", "has_error", "kind",
colServiceName, "name",
"attributes_string", "attributes_number", "attributes_bool", "resources_string",
"events", "status_message", "status_code_string", "kind_string", "parent_span_id",
"flags", "is_remote", "trace_state", "status_code",
"db_name", "db_operation", "http_method", "http_url", "http_host",
"external_http_method", "external_http_url", "response_status_code",
)
sb.From(fmt.Sprintf("%s.%s", spantypes.TraceDB, spantypes.TraceTable))
ids := make([]any, len(spanIDs))
for i, id := range spanIDs {
ids[i] = id
}
sb.Where(
sb.E("trace_id", traceID),
sb.In("span_id", ids...),
sb.GE("ts_bucket_start", start.Unix()-1800),
sb.LE("ts_bucket_start", end.Unix()),
)
sb.OrderByAsc("timestamp")
sb.OrderByAsc("name")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var spans []spantypes.StorableSpan
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &spans, query, args...); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying trace spans by IDs")
}
return spans, nil
}

View File

@@ -10,9 +10,13 @@ import (
// Handler exposes HTTP handlers for trace detail APIs.
type Handler interface {
GetWaterfall(http.ResponseWriter, *http.Request)
GetWaterfallV4(http.ResponseWriter, *http.Request)
GetFlamegraph(http.ResponseWriter, *http.Request)
}
// Module defines the business logic for trace detail operations.
type Module interface {
GetWaterfall(ctx context.Context, traceID string, req *spantypes.PostableWaterfall) (*spantypes.GettableWaterfallTrace, error)
GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error)
GetFlamegraph(ctx context.Context, traceID string, req *spantypes.PostableFlamegraph) (*spantypes.GettableFlamegraphTrace, error)
}

View File

@@ -19,6 +19,8 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const traceOutsideRangeWarn = "Query %s references a trace_id that exists between %s and %s (UTC) but lies outside the selected time range; adjust the time range to see results"
type builderQuery[T any] struct {
logger *slog.Logger
telemetryStore telemetrystore.TelemetryStore
@@ -199,7 +201,21 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return q.executeWindowList(ctx)
}
stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec, q.variables)
fromMS, toMS := q.fromMS, q.toMS
if q.spec.Signal == telemetrytypes.SignalTraces || q.spec.Signal == telemetrytypes.SignalLogs {
var overlap bool
var warning string
fromMS, toMS, overlap, warning = q.narrowWindowByTraceID(ctx, fromMS, toMS)
if !overlap {
res := emptyResultFor(q.kind, q.spec.Name)
if warning != "" {
res.Warnings = []string{warning}
}
return res, nil
}
}
stmt, err := q.stmtBuilder.Build(ctx, fromMS, toMS, q.kind, q.spec, q.variables)
if err != nil {
return nil, err
}
@@ -215,6 +231,88 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return result, nil
}
// narrowWindowByTraceID inspects the filter for trace_id predicates and clamps
// [fromMS,toMS] to the time range stored in signoz_traces.distributed_trace_summary.
// Returns the (possibly narrowed) window, overlap=false when the trace lies
// completely outside the query window (callers should short-circuit), and a
// warning string the caller should attach to the empty result when the trace
// exists but is outside the selected window.
//
// When the trace_id is not present in trace_summary the behaviour differs by
// signal:
// - traces: trace_summary is derived from the spans table, so a missing row
// means no spans exist for that trace_id; we short-circuit to empty.
// - logs: logs can carry a trace_id even when traces are not ingested at all
// (e.g. traces disabled). We must not short-circuit; instead leave the
// window untouched and let the query run.
func (q *builderQuery[T]) narrowWindowByTraceID(ctx context.Context, fromMS, toMS uint64) (uint64, uint64, bool, string) {
if q.spec.Filter == nil || q.spec.Filter.Expression == "" {
return fromMS, toMS, true, ""
}
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
if !found || len(traceIDs) == 0 {
return fromMS, toMS, true, ""
}
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
traceStart, traceEnd, exists, err := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
if err != nil {
return fromMS, toMS, true, ""
}
if !exists {
if q.spec.Signal == telemetrytypes.SignalTraces {
q.logger.DebugContext(ctx, "trace_id not found in trace_summary; short-circuiting traces query to empty",
slog.Any("trace_ids", traceIDs))
return fromMS, toMS, false, ""
}
q.logger.DebugContext(ctx, "trace_id not found in trace_summary; leaving time range untouched for logs",
slog.Any("trace_ids", traceIDs))
return fromMS, toMS, true, ""
}
traceStartMS := uint64(traceStart) / 1_000_000
traceEndMS := uint64(traceEnd) / 1_000_000
if traceStartMS == 0 || traceEndMS == 0 {
return fromMS, toMS, true, ""
}
if traceStartMS > toMS || traceEndMS < fromMS {
traceStartUTC := time.UnixMilli(int64(traceStartMS)).UTC().Format(time.RFC3339)
traceEndUTC := time.UnixMilli(int64(traceEndMS)).UTC().Format(time.RFC3339)
return fromMS, toMS, false, fmt.Sprintf(traceOutsideRangeWarn, q.spec.Name, traceStartUTC, traceEndUTC)
}
if traceStartMS > fromMS {
fromMS = traceStartMS
}
if traceEndMS < toMS {
toMS = traceEndMS
}
q.logger.DebugContext(ctx, "optimized time range using trace_id lookup",
slog.String("signal", q.spec.Signal.StringValue()),
slog.Any("trace_ids", traceIDs),
slog.Uint64("start", fromMS),
slog.Uint64("end", toMS))
return fromMS, toMS, true, ""
}
// emptyResultFor returns an empty result payload appropriate for the given kind.
func emptyResultFor(kind qbtypes.RequestType, queryName string) *qbtypes.Result {
var value any
switch kind {
case qbtypes.RequestTypeTimeSeries:
value = &qbtypes.TimeSeriesData{QueryName: queryName}
case qbtypes.RequestTypeScalar:
value = &qbtypes.ScalarData{QueryName: queryName}
default:
value = &qbtypes.RawData{QueryName: queryName}
}
return &qbtypes.Result{
Type: kind,
Value: value,
}
}
// executeWithContext executes the query with query window and step context for partial value detection.
func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
@@ -310,42 +408,27 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
totalBytes := uint64(0)
start := time.Now()
// Check if filter contains trace_id(s) and optimize time range if needed
if q.spec.Signal == telemetrytypes.SignalTraces &&
q.spec.Filter != nil && q.spec.Filter.Expression != "" {
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
if found && len(traceIDs) > 0 {
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
traceStart, traceEnd, ok := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
traceStartMS := uint64(traceStart) / 1_000_000
traceEndMS := uint64(traceEnd) / 1_000_000
if !ok {
q.logger.DebugContext(ctx, "failed to get trace time range", slog.Any("trace_ids", traceIDs))
} else if traceStartMS > 0 && traceEndMS > 0 {
// no overlap — nothing to return
if uint64(traceStartMS) > toMS || uint64(traceEndMS) < fromMS {
return &qbtypes.Result{
Type: qbtypes.RequestTypeRaw,
Value: &qbtypes.RawData{
QueryName: q.spec.Name,
},
Stats: qbtypes.ExecStats{
DurationMS: uint64(time.Since(start).Milliseconds()),
},
}, nil
}
// clamp window to trace time range before bucketing
if uint64(traceStartMS) > fromMS {
fromMS = uint64(traceStartMS)
}
if uint64(traceEndMS) < toMS {
toMS = uint64(traceEndMS)
}
q.logger.DebugContext(ctx, "optimized time range for traces", slog.Any("trace_ids", traceIDs), slog.Uint64("start", fromMS), slog.Uint64("end", toMS))
// Check if filter contains trace_id(s) and optimize time range if needed.
// Applies to both traces (the listing this branch was built for) and logs
// (which carry trace_id and benefit from the same clamp before bucketing).
if q.spec.Signal == telemetrytypes.SignalTraces || q.spec.Signal == telemetrytypes.SignalLogs {
var overlap bool
var warning string
fromMS, toMS, overlap, warning = q.narrowWindowByTraceID(ctx, fromMS, toMS)
if !overlap {
res := &qbtypes.Result{
Type: qbtypes.RequestTypeRaw,
Value: &qbtypes.RawData{
QueryName: q.spec.Name,
},
Stats: qbtypes.ExecStats{
DurationMS: uint64(time.Since(start).Milliseconds()),
},
}
if warning != "" {
res.Warnings = []string{warning}
}
return res, nil
}
}

View File

@@ -10,7 +10,6 @@ import (
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/utils"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
@@ -77,7 +76,7 @@ func NewFieldMapper(fl flagger.Flagger) qbtypes.FieldMapper {
return &fieldMapper{fl: fl}
}
func (m *fieldMapper) getColumns(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
columns := []*schema.Column{logsV2Columns["resources_string"], logsV2Columns["resource"]}
@@ -246,7 +245,7 @@ func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetr
}
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
columns, err := m.getColumns(ctx, key)
columns, err := m.getColumn(ctx, key)
if err != nil {
return "", err
}
@@ -288,14 +287,12 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
return "", qbtypes.ErrColumnNotFound
}
for _, column := range columns {
expr, err := m.buildFieldForJSON(key, column)
if err != nil {
return "", err
}
exprs = append(exprs, expr)
expr, err := m.buildFieldForJSON(key)
if err != nil {
return "", err
}
exprs = append(exprs, expr)
default:
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
}
@@ -351,7 +348,7 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
}
func (m *fieldMapper) ColumnFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
return m.getColumns(ctx, key)
return m.getColumn(ctx, key)
}
func (m *fieldMapper) ColumnExpressionFor(
@@ -406,13 +403,16 @@ func (m *fieldMapper) ColumnExpressionFor(
}
// buildFieldForJSON builds the field expression for body JSON fields using arrayConcat pattern.
func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey, column *schemamigrator.Column) (string, error) {
node, err := key.PlanBuilder.Build(*column)
if err != nil {
return "", err
func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (string, error) {
plan := key.JSONPlan
if len(plan) == 0 {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
"Could not find any valid paths for: %s", key.Name)
}
if node.IsTerminal {
if plan[0].IsTerminal {
node := plan[0]
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
// TODO(Piyush): Promoted path logic commented out. Materialized now means type hint
// promotion will be extracted from key field evolution
@@ -450,7 +450,7 @@ func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey, c
}
// Build arrayConcat pattern directly from the tree structure
arrayConcatExpr, err := m.buildArrayConcat(node)
arrayConcatExpr, err := m.buildArrayConcat(plan)
if err != nil {
return "", err
}
@@ -459,18 +459,23 @@ func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey, c
}
// buildArrayConcat builds the arrayConcat pattern directly from the tree structure.
func (m *fieldMapper) buildArrayConcat(node *telemetrytypes.JSONAccessNode) (string, error) {
func (m *fieldMapper) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) {
if len(plan) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat")
}
// Build arrayMap expressions for ALL available branches at the root level.
// Iterate branches in deterministic order (JSON then Dynamic)
var arrayMapExpressions []string
for _, branchType := range node.BranchesInOrder() {
expr, err := m.buildArrayMap(node, branchType)
if err != nil {
return "", err
for _, node := range plan {
for _, branchType := range node.BranchesInOrder() {
expr, err := m.buildArrayMap(node, branchType)
if err != nil {
return "", err
}
arrayMapExpressions = append(arrayMapExpressions, expr)
}
arrayMapExpressions = append(arrayMapExpressions, expr)
}
if len(arrayMapExpressions) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
}

View File

@@ -33,42 +33,32 @@ func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType te
}
// BuildCondition builds the full WHERE condition for body_v2 JSON paths.
func (c *jsonConditionBuilder) buildJSONCondition(columns []schemamigrator.Column, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if len(columns) == 1 {
baseCond, err := c.emitPlannedCondition(columns[0], operator, value, sb)
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
baseCond, err := c.emitPlannedCondition(operator, value, sb)
if err != nil {
return "", err
}
// path index
if operator.AddDefaultExistsFilter() {
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
return sb.And(baseCond, pathIndex), nil
}
return baseCond, nil
}
func (c *jsonConditionBuilder) emitPlannedCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
// Build traversal + terminal recursively per-hop
conditions := []string{}
for _, node := range c.key.JSONPlan {
condition, err := c.recurseArrayHops(node, operator, value, sb)
if err != nil {
return "", err
}
// path index
if operator.AddDefaultExistsFilter() {
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
return sb.And(baseCond, pathIndex), nil
}
return baseCond, nil
conditions = append(conditions, condition)
}
}
func (c *jsonConditionBuilder) emitPlannedCondition(column schemamigrator.Column, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if c.key.PlanBuilder == nil {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "no plan builder for key %s", c.key.Name)
}
var conditions []string
node, err := c.key.PlanBuilder.Build(column)
if err != nil {
return "", err
}
cond, err := c.recurseArrayHops(node, operator, value, sb)
if err != nil {
return "", err
}
conditions = append(conditions, cond)
return sb.Or(conditions...), nil
}

View File

@@ -85,16 +85,27 @@ func (t *telemetryMetaStore) enrichJSONKeys(ctx context.Context, selectors []*te
key.Indexes = indexes[key.Name]
}
// attach a lazy PlanBuilder so field_mapper can build plans on demand
// build JSON access plans using the pre-fetched parent type cache
return t.buildJSONPlans(filteredKeys, parentTypeCache)
}
// buildJSONPlans builds JSON access plans for the given keys
// using the provided parent type cache (pre-fetched in the main UNION query).
func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFieldKey, typeCache map[string][]telemetrytypes.FieldDataType) error {
if len(keys) == 0 {
return nil
}
columnMeta := t.jsonColumnMetadata[telemetrytypes.SignalLogs][telemetrytypes.FieldContextBody]
for _, key := range filteredKeys {
key.PlanBuilder = telemetrytypes.NewFieldPlanBuilder(key, columnMeta, parentTypeCache)
for _, key := range keys {
if err := key.SetJSONAccessPlan(columnMeta, typeCache); err != nil {
return err
}
}
return nil
}
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
filteredPaths := []string{}
for _, path := range paths {

View File

@@ -21,19 +21,19 @@ func NewTraceTimeRangeFinder(telemetryStore telemetrystore.TelemetryStore) *Trac
}
}
func (f *TraceTimeRangeFinder) GetTraceTimeRange(ctx context.Context, traceID string) (startNano, endNano int64, ok bool) {
func (f *TraceTimeRangeFinder) GetTraceTimeRange(ctx context.Context, traceID string) (startNano, endNano int64, exists bool, error error) {
traceIDs := []string{traceID}
return f.GetTraceTimeRangeMulti(ctx, traceIDs)
}
func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, traceIDs []string) (startNano, endNano int64, ok bool) {
func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, traceIDs []string) (startNano, endNano int64, exists bool, error error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalTraces.StringValue(),
instrumentationtypes.CodeNamespace: "trace-time-range",
instrumentationtypes.CodeFunctionName: "GetTraceTimeRangeMulti",
})
if len(traceIDs) == 0 {
return 0, 0, false
return 0, 0, false, nil
}
cleanedIDs := make([]string, len(traceIDs))
@@ -49,7 +49,8 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
}
query := fmt.Sprintf(`
SELECT
SELECT
count(),
toUnixTimestamp64Nano(min(start)),
toUnixTimestamp64Nano(max(end))
FROM %s.%s
@@ -58,9 +59,14 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
row := f.telemetryStore.ClickhouseDB().QueryRow(ctx, query, args...)
err := row.Scan(&startNano, &endNano)
var rowCount uint64
err := row.Scan(&rowCount, &startNano, &endNano)
if err != nil {
return 0, 0, false
return 0, 0, false, err
}
if rowCount == 0 {
return 0, 0, false, nil
}
if startNano > 1_000_000_000 {
@@ -68,5 +74,5 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
}
endNano += 1_000_000_000
return startNano, endNano, true
return startNano, endNano, true, nil
}

View File

@@ -43,7 +43,7 @@ func TestGetTraceTimeRangeMulti(t *testing.T) {
finder := &TraceTimeRangeFinder{telemetryStore: nil}
if !tt.expectOK {
_, _, ok := finder.GetTraceTimeRangeMulti(ctx, tt.traceIDs)
_, _, ok, _ := finder.GetTraceTimeRangeMulti(ctx, tt.traceIDs)
assert.False(t, ok)
}
})

View File

@@ -0,0 +1,81 @@
package spantypes
import (
"maps"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
type FlamegraphSpan struct {
SpanID string `json:"spanId"`
ParentSpanID string `json:"parentSpanId"`
Timestamp uint64 `json:"timestamp"`
DurationNano uint64 `json:"durationNano"`
HasError bool `json:"hasError"`
ServiceName string `json:"serviceName"`
Name string `json:"name"`
Level int64 `json:"level"`
Events []Event `json:"event"`
Attributes map[string]any `json:"attributes,omitempty"`
Resource map[string]string `json:"resource,omitempty"`
Children []*FlamegraphSpan `json:"-"` // internal tree use only
}
// FlamegraphLevel groups span IDs at a single level within the selected window.
type FlamegraphLevel struct {
Level int64
SpanIDs []string
}
type PostableFlamegraph struct {
SelectedSpanID string `json:"selectedSpanId"`
SelectFields []telemetrytypes.TelemetryFieldKey `json:"selectFields,omitempty"`
}
// GettableFlamegraphTrace is the response for the v3 flamegraph API.
type GettableFlamegraphTrace struct {
Spans [][]*FlamegraphSpan `json:"spans"`
StartTimestampMillis int64 `json:"startTimestampMillis"`
EndTimestampMillis int64 `json:"endTimestampMillis"`
HasMore bool `json:"hasMore"`
}
func NewGettableFlamegraphTrace(spans [][]*FlamegraphSpan, startMs, endMs int64, hasMore bool) *GettableFlamegraphTrace {
return &GettableFlamegraphTrace{
Spans: spans,
StartTimestampMillis: startMs,
EndTimestampMillis: endMs,
HasMore: hasMore,
}
}
func NewFlamegraphSpanFromStorable(s *StorableSpan, level int64) *FlamegraphSpan {
resources := make(map[string]string, len(s.ResourcesString))
maps.Copy(resources, s.ResourcesString)
return &FlamegraphSpan{
SpanID: s.SpanID,
ParentSpanID: s.ParentSpanID,
Timestamp: uint64(s.StartTime.UnixNano()),
DurationNano: s.DurationNano,
HasError: s.HasError,
ServiceName: s.ServiceName,
Name: s.Name,
Level: level,
Events: s.UnmarshalledEvents(),
Attributes: s.Attributes(),
Resource: resources,
}
}
// FlamegraphWindowSpanIDs collects all span IDs from a level window into a flat slice.
func FlamegraphWindowSpanIDs(window []FlamegraphLevel) []string {
total := 0
for _, lvl := range window {
total += len(lvl.SpanIDs)
}
ids := make([]string, 0, total)
for _, lvl := range window {
ids = append(ids, lvl.SpanIDs...)
}
return ids
}

View File

@@ -0,0 +1,279 @@
package spantypes
import (
"sort"
)
// FlamegraphTrace holds the level wise tree built from minimal spans.
type FlamegraphTrace struct {
roots []*FlamegraphSpan
nodeByID map[string]*FlamegraphSpan
startTime uint64
endTime uint64
}
func NewFlamegraphTraceFromMinimal(spans []MinimalSpan) *FlamegraphTrace {
t := &FlamegraphTrace{
nodeByID: make(map[string]*FlamegraphSpan, len(spans)),
}
for i := range spans {
node := spans[i].ToFlamegraphSpan()
t.updateTimeRange(node.Timestamp, node.DurationNano)
t.nodeByID[node.SpanID] = node
}
t.wireTree()
return t
}
func NewFlamegraphTraceFromStorable(spans []StorableSpan) *FlamegraphTrace {
t := &FlamegraphTrace{
nodeByID: make(map[string]*FlamegraphSpan, len(spans)),
}
for i := range spans {
node := NewFlamegraphSpanFromStorable(&spans[i], 0) // level is set later by BFS
t.updateTimeRange(node.Timestamp, node.DurationNano)
t.nodeByID[node.SpanID] = node
}
t.wireTree()
return t
}
func (t *FlamegraphTrace) GetAllLevels() [][]*FlamegraphSpan {
allLevels := t.buildAllLevels()
for _, node := range t.nodeByID {
node.Children = nil // children not required after building tree
}
return allLevels
}
// GetSelectedLevels returns the level window for selectedSpanID with sampling applied to
// dense levels. It always applies windowing — callers should only invoke this when the
// trace is known to exceed the select-all limit.
// Children are cleared after traversal so the tree can be GC'd.
func (t *FlamegraphTrace) GetSelectedLevels(
selectedSpanID string,
levelLimit, spansPerLevel, topLatencyCount, bucketCount int,
) []FlamegraphLevel {
allLevels := t.buildAllLevels()
for _, node := range t.nodeByID {
node.Children = nil
}
selectedIndex := 0
if selectedSpanID != "" {
outer:
for i, lvl := range allLevels {
for _, span := range lvl {
if span.SpanID == selectedSpanID {
selectedIndex = i
break outer
}
}
}
}
lowerLimit := selectedIndex - int(float64(levelLimit)*0.4)
upperLimit := selectedIndex + int(float64(levelLimit)*0.6)
if lowerLimit < 0 {
upperLimit -= lowerLimit
lowerLimit = 0
}
if upperLimit > len(allLevels) {
lowerLimit -= upperLimit - len(allLevels)
upperLimit = len(allLevels)
}
if lowerLimit < 0 {
lowerLimit = 0
}
result := make([]FlamegraphLevel, 0, upperLimit-lowerLimit)
for i := lowerLimit; i < upperLimit; i++ {
lvl := allLevels[i]
if len(lvl) == 0 {
continue
}
var sampled []*FlamegraphSpan
if len(lvl) > spansPerLevel {
sampled = sampleFlamegraphLevel(lvl, selectedSpanID, i == selectedIndex,
t.startTime, t.endTime, topLatencyCount, bucketCount)
} else {
sampled = lvl
}
if len(sampled) == 0 {
continue
}
spanIDs := make([]string, len(sampled))
for j, s := range sampled {
spanIDs[j] = s.SpanID
}
result = append(result, FlamegraphLevel{
Level: sampled[0].Level,
SpanIDs: spanIDs,
})
}
return result
}
func (t *FlamegraphTrace) EnrichSelectedSpans(selectedSpans []FlamegraphLevel, fullSpans []StorableSpan) [][]*FlamegraphSpan {
fullByID := make(map[string]*StorableSpan, len(fullSpans))
for i := range fullSpans {
fullByID[fullSpans[i].SpanID] = &fullSpans[i]
}
result := make([][]*FlamegraphSpan, len(selectedSpans))
for i, lvl := range selectedSpans {
result[i] = make([]*FlamegraphSpan, 0, len(lvl.SpanIDs))
for _, spanID := range lvl.SpanIDs {
if full, ok := fullByID[spanID]; ok {
result[i] = append(result[i], NewFlamegraphSpanFromStorable(full, lvl.Level))
} else if lean, ok := t.nodeByID[spanID]; ok {
result[i] = append(result[i], lean)
}
}
}
return result
}
func (t *FlamegraphTrace) updateTimeRange(timestamp, durationNano uint64) {
if t.startTime == 0 || timestamp < t.startTime {
t.startTime = timestamp
}
if end := timestamp + durationNano; end > t.endTime {
t.endTime = end
}
}
func (t *FlamegraphTrace) wireTree() {
for _, node := range t.nodeByID {
if node.ParentSpanID != "" {
if parent, ok := t.nodeByID[node.ParentSpanID]; ok {
parent.Children = append(parent.Children, node)
} else {
missing := &FlamegraphSpan{
SpanID: node.ParentSpanID,
Name: "Missing Span",
Timestamp: node.Timestamp,
DurationNano: node.DurationNano,
Children: []*FlamegraphSpan{node},
}
t.nodeByID[missing.SpanID] = missing
t.roots = append(t.roots, missing)
}
} else if flamegraphSpanIndex(t.roots, node.SpanID) == -1 {
t.roots = append(t.roots, node)
}
}
sort.Slice(t.roots, func(i, j int) bool {
if t.roots[i].Timestamp == t.roots[j].Timestamp {
return t.roots[i].SpanID < t.roots[j].SpanID
}
return t.roots[i].Timestamp < t.roots[j].Timestamp
})
}
func (t *FlamegraphTrace) buildAllLevels() [][]*FlamegraphSpan {
var result [][]*FlamegraphSpan
type entry struct {
node *FlamegraphSpan
depth int64
}
for _, root := range t.roots {
levelMap := make(map[int64][]*FlamegraphSpan)
maxDepth := int64(-1)
queue := []entry{{root, 0}}
for len(queue) > 0 {
curr := queue[0]
queue = queue[1:]
curr.node.Level = curr.depth
levelMap[curr.depth] = append(levelMap[curr.depth], curr.node)
if curr.depth > maxDepth {
maxDepth = curr.depth
}
for _, child := range curr.node.Children {
queue = append(queue, entry{child, curr.depth + 1})
}
}
for depth := int64(0); depth <= maxDepth; depth++ {
if spans, ok := levelMap[depth]; ok {
result = append(result, spans)
}
}
}
return result
}
func sampleFlamegraphLevel(
spans []*FlamegraphSpan,
selectedSpanID string,
isSelectedLevel bool,
startTime, endTime uint64,
topLatencyCount, bucketCount int,
) []*FlamegraphSpan {
sorted := make([]*FlamegraphSpan, len(spans))
copy(sorted, spans)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].DurationNano > sorted[j].DurationNano
})
var sampled []*FlamegraphSpan
topK := topLatencyCount
if topK > len(sorted) {
topK = len(sorted)
}
sampled = append(sampled, sorted[:topK]...)
if isSelectedLevel {
for _, span := range sorted {
if span.SpanID == selectedSpanID {
sampled = append(sampled, span)
break
}
}
}
bucketSize := (endTime - startTime) / uint64(bucketCount)
if bucketSize == 0 {
bucketSize = 1
}
buckets := make([][]*FlamegraphSpan, bucketCount)
for _, span := range sorted {
if span.Timestamp < startTime || span.Timestamp > endTime {
continue
}
idx := int((span.Timestamp - startTime) / bucketSize)
if idx < 0 {
idx = 0
} else if idx >= bucketCount {
idx = bucketCount - 1
}
buckets[idx] = append(buckets[idx], span)
}
for i := range buckets {
if len(buckets[i]) > 2 {
buckets[i] = buckets[i][:2]
}
}
for _, bucket := range buckets {
sampled = append(sampled, bucket...)
}
return sampled
}
func flamegraphSpanIndex(spans []*FlamegraphSpan, spanID string) int {
for i, s := range spans {
if s != nil && s.SpanID == spanID {
return i
}
}
return -1
}

View File

@@ -2,6 +2,7 @@ package spantypes
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -26,4 +27,6 @@ type SpanMapperStore interface {
type TraceStore interface {
GetTraceSummary(ctx context.Context, traceID string) (*TraceSummary, error)
GetTraceSpans(ctx context.Context, traceID string, summary *TraceSummary) ([]StorableSpan, error)
GetMinimalSpans(ctx context.Context, traceID string, start, end time.Time) ([]MinimalSpan, error)
GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]StorableSpan, error)
}

View File

@@ -103,12 +103,10 @@ type StorableSpan struct {
StartTime time.Time `ch:"timestamp"`
DurationNano uint64 `ch:"duration_nano"`
SpanID string `ch:"span_id"`
TraceID string `ch:"trace_id"`
HasError bool `ch:"has_error"`
Kind int8 `ch:"kind"`
ServiceName string `ch:"resource_string_service$$name"`
Name string `ch:"name"`
References string `ch:"references"`
AttributesString map[string]string `ch:"attributes_string"`
AttributesNumber map[string]float64 `ch:"attributes_number"`
AttributesBool map[string]bool `ch:"attributes_bool"`
@@ -132,6 +130,44 @@ type StorableSpan struct {
ResponseStatusCode string `ch:"response_status_code"`
}
// MinimalSpan with only the fields needed to build the parent-child tree.
type MinimalSpan struct {
SpanID string `ch:"span_id"`
ParentSpanID string `ch:"parent_span_id"`
StartTime time.Time `ch:"timestamp"`
DurationNano uint64 `ch:"duration_nano"`
HasError bool `ch:"has_error"`
ServiceName string `ch:"resource_string_service$$name"`
}
func (item *MinimalSpan) ToWaterfallSpan(traceID string) *WaterfallSpan {
return &WaterfallSpan{
SpanID: item.SpanID,
TraceID: traceID,
ParentSpanID: item.ParentSpanID,
TimeUnix: uint64(item.StartTime.UnixNano()),
DurationNano: item.DurationNano,
HasError: item.HasError,
ServiceName: item.ServiceName,
Resource: map[string]string{"service.name": item.ServiceName},
Children: make([]*WaterfallSpan, 0),
Attributes: make(map[string]any),
Events: make([]Event, 0),
}
}
func (item *MinimalSpan) ToFlamegraphSpan() *FlamegraphSpan {
return &FlamegraphSpan{
SpanID: item.SpanID,
ParentSpanID: item.ParentSpanID,
Timestamp: uint64(item.StartTime.UnixNano()),
DurationNano: item.DurationNano,
HasError: item.HasError,
ServiceName: item.ServiceName,
Children: make([]*FlamegraphSpan, 0),
}
}
// NewMissingWaterfallSpan creates a synthetic placeholder span for a parent that has no recorded data.
func NewMissingWaterfallSpan(spanID, traceID string, timeUnixNano, durationNano uint64) *WaterfallSpan {
return &WaterfallSpan{
@@ -261,7 +297,7 @@ func (item *StorableSpan) UnmarshalledEvents() []Event {
return events
}
func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
func (item *StorableSpan) ToWaterfallSpan(traceID string) *WaterfallSpan {
resources := make(map[string]string)
maps.Copy(resources, item.ResourcesString)
@@ -289,7 +325,7 @@ func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
StatusCode: item.StatusCode,
StatusCodeString: item.StatusCodeString,
StatusMessage: item.StatusMessage,
TraceID: item.TraceID,
TraceID: traceID,
TraceState: item.TraceState,
Children: make([]*WaterfallSpan, 0),
TimeUnix: uint64(item.StartTime.UnixNano()),
@@ -297,6 +333,24 @@ func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
}
}
func EnrichSelectedSpans(window []*WaterfallSpan, fullSpans []StorableSpan) {
fullByID := make(map[string]*StorableSpan, len(fullSpans))
for i := range fullSpans {
fullByID[fullSpans[i].SpanID] = &fullSpans[i]
}
for i, ws := range window {
full, ok := fullByID[ws.SpanID]
if !ok {
continue // synthesized MissingSpan — keep empty shell
}
newWS := full.ToWaterfallSpan(ws.TraceID)
newWS.Level = ws.Level
newWS.HasChildren = ws.HasChildren
newWS.SubTreeNodeCount = ws.SubTreeNodeCount
window[i] = newWS
}
}
// getSpanIndex returns the index of matched span and -1 for no match.
func getSpanIndex(spans []*WaterfallSpan, targetSpanID string) int {
for i, s := range spans {

View File

@@ -62,26 +62,24 @@ func NewWaterfallTrace(
}
}
func NewWaterfallTraceFromSpans(spans []StorableSpan) *WaterfallTrace {
// NewWaterfallTraceFromSpans requires WaterfallSpan nodes with only below fields:
// SpanID, ParentSpanID, TimeUnix, DurationNano, HasError, and ServiceName.
func NewWaterfallTraceFromSpans(nodes []*WaterfallSpan) *WaterfallTrace {
var (
startTime, endTime, totalErrorSpans uint64
spanIDToSpanNodeMap = make(map[string]*WaterfallSpan, len(spans))
spanIDToSpanNodeMap = make(map[string]*WaterfallSpan, len(nodes))
traceRoots []*WaterfallSpan
hasMissingSpans bool
)
for _, item := range spans {
span := item.ToWaterfallSpan()
startTimeUnixNano := uint64(item.StartTime.UnixNano())
if startTime == 0 || startTimeUnixNano < startTime {
startTime = startTimeUnixNano
for _, span := range nodes {
if startTime == 0 || span.TimeUnix < startTime {
startTime = span.TimeUnix
}
endTime = max(endTime, startTimeUnixNano+span.DurationNano)
endTime = max(endTime, span.TimeUnix+span.DurationNano)
if span.HasError {
totalErrorSpans++
}
spanIDToSpanNodeMap[span.SpanID] = span
}
@@ -116,7 +114,7 @@ func NewWaterfallTraceFromSpans(spans []StorableSpan) *WaterfallTrace {
return NewWaterfallTrace(
startTime,
endTime,
uint64(len(spans)),
uint64(len(nodes)),
totalErrorSpans,
spanIDToSpanNodeMap,
traceRoots,

View File

@@ -37,7 +37,7 @@ type TelemetryFieldKey struct {
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
PlanBuilder *FieldPlanBuilder `json:"-"`
JSONPlan JSONAccessPlan `json:"-"`
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
@@ -120,7 +120,7 @@ func (f *TelemetryFieldKey) OverrideMetadataFrom(src *TelemetryFieldKey) {
f.FieldDataType = src.FieldDataType
f.Indexes = src.Indexes
f.Materialized = src.Materialized
f.PlanBuilder = src.PlanBuilder
f.JSONPlan = src.JSONPlan
f.Evolutions = src.Evolutions
}

View File

@@ -6,7 +6,6 @@ import (
"slices"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -29,6 +28,8 @@ type JSONColumnMetadata struct {
PromotedColumn string
}
type JSONAccessPlan = []*JSONAccessNode
type TerminalConfig struct {
Key *TelemetryFieldKey
ElemType JSONDataType
@@ -98,48 +99,23 @@ func (n *JSONAccessNode) BranchesInOrder() []JSONAccessBranchType {
})
}
// FieldPlanBuilder builds JSON access nodes on demand for a specific field key.
// It holds all necessary metadata (key, column info, type cache) and produces
// the correct root node per column, dispatching between base and promoted plans
// by column name — without any external "is promoted?" flag.
type FieldPlanBuilder struct {
type planBuilder struct {
key *TelemetryFieldKey
columnInfo JSONColumnMetadata
typeCache map[string][]FieldDataType
paths []string // cumulative paths for type cache lookups
segments []string // individual path segments for node names
}
func NewFieldPlanBuilder(key *TelemetryFieldKey, info JSONColumnMetadata, typeCache map[string][]FieldDataType) *FieldPlanBuilder {
return &FieldPlanBuilder{key: key, columnInfo: info, typeCache: typeCache}
}
// Build dispatches by column name — called by JSONConditionBuilder in field_mapper.
func (b *FieldPlanBuilder) Build(column schemamigrator.Column) (*JSONAccessNode, error) {
if column.Name == b.columnInfo.BaseColumn {
return b.build(NewRootJSONAccessNode(b.columnInfo.BaseColumn, 32, 0))
}
return b.build(NewRootJSONAccessNode(b.columnInfo.PromotedColumn, 32, 1024))
}
func (b *FieldPlanBuilder) build(root *JSONAccessNode) (*JSONAccessNode, error) {
if b.key.Name == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
}
b.paths = b.key.ArrayParentPaths()
b.segments = b.key.ArrayPathSegments()
return b.buildPlan(0, root, false)
isPromoted bool
typeCache map[string][]FieldDataType
}
// buildPlan recursively builds the path plan tree.
func (b *FieldPlanBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
if index >= len(b.paths) {
func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
if index >= len(pb.paths) {
return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds")
}
pathSoFar := b.paths[index] // cumulative path for type cache lookup
segmentName := b.segments[index] // segment name for node
isTerminal := index == len(b.paths)-1
pathSoFar := pb.paths[index] // cumulative path for type cache lookup
segmentName := pb.segments[index] // segment name for node
isTerminal := index == len(pb.paths)-1
// Calculate progression parameters based on parent's values
var maxTypes, maxPaths int
@@ -173,18 +149,18 @@ func (b *FieldPlanBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArr
// Configure terminal if this is the last part
if isTerminal {
// fielddatatype must not be unspecified else expression can not be generated
if b.key.FieldDataType == FieldDataTypeUnspecified {
if pb.key.FieldDataType == FieldDataTypeUnspecified {
return nil, errors.NewInternalf(CodePlanFieldDataTypeMissing, "field data type is missing for path %s", pathSoFar)
}
node.TerminalConfig = &TerminalConfig{
Key: b.key,
ElemType: b.key.GetJSONDataType(),
Key: pb.key,
ElemType: pb.key.GetJSONDataType(),
}
} else {
var err error
// Use cached types from the batched metadata query
types, ok := b.typeCache[pathSoFar]
types, ok := pb.typeCache[pathSoFar]
if !ok {
return nil, errors.NewInternalf(errors.CodeInvalidInput, "types missing for path %s", pathSoFar)
}
@@ -196,13 +172,13 @@ func (b *FieldPlanBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArr
}
if hasJSON {
node.Branches[BranchJSON], err = b.buildPlan(index+1, node, false)
node.Branches[BranchJSON], err = pb.buildPlan(index+1, node, false)
if err != nil {
return nil, err
}
}
if hasDynamic {
node.Branches[BranchDynamic], err = b.buildPlan(index+1, node, true)
node.Branches[BranchDynamic], err = pb.buildPlan(index+1, node, true)
if err != nil {
return nil, err
}
@@ -211,3 +187,45 @@ func (b *FieldPlanBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArr
return node, nil
}
// buildJSONAccessPlan builds a tree structure representing the complete JSON path traversal
// that precomputes all possible branches and their types.
func (key *TelemetryFieldKey) SetJSONAccessPlan(columnInfo JSONColumnMetadata, typeCache map[string][]FieldDataType,
) error {
// if path is empty, return nil
if key.Name == "" {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
}
pb := &planBuilder{
key: key,
paths: key.ArrayParentPaths(),
segments: key.ArrayPathSegments(),
isPromoted: key.Materialized,
typeCache: typeCache,
}
node, err := pb.buildPlan(0,
NewRootJSONAccessNode(columnInfo.BaseColumn,
32, 0),
false,
)
if err != nil {
return err
}
key.JSONPlan = append(key.JSONPlan, node)
if pb.isPromoted {
node, err := pb.buildPlan(0,
NewRootJSONAccessNode(columnInfo.PromotedColumn,
32, 1024),
true,
)
if err != nil {
return err
}
key.JSONPlan = append(key.JSONPlan, node)
}
return nil
}

View File

@@ -20,6 +20,7 @@ from fixtures.querier import (
index_series_by_label,
make_query_request,
)
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
def test_logs_list(
@@ -2293,3 +2294,334 @@ def test_logs_formula_orderby_and_limit(
assert len(f3_services) == 3, f"F3: expected 3 rows after limit, got {len(f3_services)}"
assert f3_values == f4_values[:3], f"F3 values {f3_values} do not match F4[:3] values {f4_values[:3]}"
assert set(f3_services) == set(f4_services[:3]), f"F3 services {f3_services} do not match F4[:3] services {f4_services[:3]}"
def test_logs_list_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that filtering logs by trace_id uses the trace_summary lookup to
narrow the query window before scanning the logs table:
1. Returns the matching log (narrow window, single bucket).
2. Does not return duplicate logs when the query window should span multiple
exponential buckets (>1 h). But is clamped to the timerange of trace.
3. Returns no results when the query window does not contain the trace.
4. Logs carrying a trace_id whose trace is NOT in trace_summary (e.g.
traces disabled) are still returned — the lookup miss must not
short-circuit logs queries.
"""
target_trace_id = TraceIdGenerator.trace_id()
orphan_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
orphan_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "logs-trace-filter-service",
"cloud.provider": "integration",
}
# Populate signoz_traces.distributed_trace_summary by inserting spans for
# the target trace_id. trace_summary records min/max of span timestamps
# (it ignores span duration), so two spans are inserted to give the trace
# a non-trivial recorded window of [now-10s, now-5s].
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
Traces(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
# Insert logs:
# - one with the target trace_id, at a timestamp within the trace's
# recorded window (now-10s..now-5s, padded ±1s).
# - one with an orphan trace_id whose trace was never ingested — used to
# verify the lookup miss does NOT short-circuit logs queries.
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=7),
resources=common_resources,
attributes={"http.method": "GET"},
body="log inside the target trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=2),
resources=common_resources,
attributes={"http.method": "PUT"},
body="log with a trace_id absent from trace_summary",
severity_text="INFO",
trace_id=orphan_trace_id,
span_id=orphan_span_id,
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _query(start_ms: int, end_ms: int, trace_id: str) -> tuple[list, list[str]]:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="raw",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"disabled": False,
"limit": 100,
"offset": 0,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"order": [
{"key": {"name": "timestamp"}, "direction": "desc"},
{"key": {"name": "id"}, "direction": "desc"},
],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
warning = (response.json().get("data") or {}).get("warning") or {}
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
return rows, messages
outside_range_msg = "lies outside the selected time range"
now_ms = int(now.timestamp() * 1000)
# --- Test 1: narrow window (single bucket, <1 h) ---
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
narrow_rows, narrow_warnings = _query(narrow_start_ms, now_ms, target_trace_id)
assert len(narrow_rows) == 1, f"Expected 1 log for trace_id filter (narrow window), got {len(narrow_rows)}"
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
assert narrow_rows[0]["data"]["span_id"] == target_root_span_id
assert not any(outside_range_msg in m for m in narrow_warnings), f"Did not expect outside-range warning, got {narrow_warnings}"
# --- Test 2: wide window (>1 h, clamp to the timerange from trace_summary) ---
# Should still return exactly one log — no duplicates from multi-bucket scan.
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_rows, wide_warnings = _query(wide_start_ms, now_ms, target_trace_id)
assert len(wide_rows) == 1, f"Expected 1 log for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-log regression"
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
assert wide_rows[0]["data"]["span_id"] == target_root_span_id
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
# --- Test 3: window that does not contain the trace returns no results + warning ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_rows, past_warnings = _query(past_start_ms, past_end_ms, target_trace_id)
assert len(past_rows) == 0, f"Expected 0 logs for trace_id filter outside time window, got {len(past_rows)}"
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
# --- Test 4: trace_id not present in trace_summary still returns logs (no warning) ---
orphan_rows, orphan_warnings = _query(narrow_start_ms, now_ms, orphan_trace_id)
assert len(orphan_rows) == 1, f"Expected 1 log for orphan trace_id (no trace_summary entry), got {len(orphan_rows)} — logs query may have been incorrectly short-circuited"
assert orphan_rows[0]["data"]["trace_id"] == orphan_trace_id
assert not any(outside_range_msg in m for m in orphan_warnings), f"Did not expect outside-range warning for orphan trace_id, got {orphan_warnings}"
def test_logs_aggregation_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that the trace_id time-range optimization also applies to
non-window-list (time_series / aggregation) logs queries:
1. Wide query window containing the trace returns the correct count.
2. Query window outside the trace's time range short-circuits to an
empty result.
3. A trace_id with no row in trace_summary (e.g. traces disabled) still
returns the matching logs — the lookup miss must not short-circuit
logs aggregation queries.
"""
target_trace_id = TraceIdGenerator.trace_id()
orphan_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
orphan_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "logs-trace-agg-service",
"cloud.provider": "integration",
}
# trace_summary records min/max of span timestamps (it ignores duration),
# so insert two spans to give the trace a recorded window wide enough to
# comfortably contain the log timestamps below.
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
Traces(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
# Two logs for the target trace_id, both inside the recorded trace window.
# One additional log carries an orphan trace_id with no row in
# trace_summary — used to verify that the lookup miss does not
# short-circuit logs aggregations.
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=7),
resources=common_resources,
attributes={},
body="log A inside trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=6),
resources=common_resources,
attributes={},
body="log B inside trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=2),
resources=common_resources,
attributes={},
body="log with a trace_id absent from trace_summary",
severity_text="INFO",
trace_id=orphan_trace_id,
span_id=orphan_span_id,
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _count(start_ms: int, end_ms: int, trace_id: str) -> tuple[float, list[str]]:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="time_series",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": False,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
warning = (response.json().get("data") or {}).get("warning") or {}
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
aggregations = results[0].get("aggregations") or []
if not aggregations:
return 0, messages
series = aggregations[0].get("series") or []
if not series:
return 0, messages
return sum(v["value"] for v in series[0]["values"]), messages
outside_range_msg = "lies outside the selected time range"
now_ms = int(now.timestamp() * 1000)
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
# --- Test 1: wide window (>1 h) containing the trace returns 2 logs ---
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_count, wide_warnings = _count(wide_start_ms, now_ms, target_trace_id)
assert wide_count == 2, f"Expected count=2 for trace_id aggregation (wide window), got {wide_count}"
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
# --- Test 2: window outside the trace short-circuits to empty + warning ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_count, past_warnings = _count(past_start_ms, past_end_ms, target_trace_id)
assert past_count == 0, f"Expected count=0 for trace_id aggregation outside time window, got {past_count}"
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
# --- Test 3: trace_id not present in trace_summary still returns logs (no warning) ---
orphan_count, orphan_warnings = _count(narrow_start_ms, now_ms, orphan_trace_id)
assert orphan_count == 1, f"Expected count=1 for orphan trace_id aggregation, got {orphan_count} — query may have been incorrectly short-circuited"
assert not any(outside_range_msg in m for m in orphan_warnings), f"Did not expect outside-range warning for orphan trace_id, got {orphan_warnings}"

View File

@@ -2062,7 +2062,7 @@ def test_traces_list_filter_by_trace_id(
trace_filter = f"trace_id = '{target_trace_id}'"
def _query(start_ms: int, end_ms: int) -> list:
def _query(start_ms: int, end_ms: int) -> tuple[list, list[str]]:
response = make_query_request(
signoz,
token,
@@ -2096,30 +2096,157 @@ def test_traces_list_filter_by_trace_id(
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
return response.json()["data"]["data"]["results"][0]["rows"] or []
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
warning = (response.json().get("data") or {}).get("warning") or {}
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
return rows, messages
outside_range_msg = "lies outside the selected time range"
now_ms = int(now.timestamp() * 1000)
# --- Test 1: narrow window (single bucket, <1 h) ---
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
narrow_rows = _query(narrow_start_ms, now_ms)
narrow_rows, narrow_warnings = _query(narrow_start_ms, now_ms)
assert len(narrow_rows) == 1, f"Expected 1 span for trace_id filter (narrow window), got {len(narrow_rows)}"
assert narrow_rows[0]["data"]["span_id"] == span_id_root
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
assert not any(outside_range_msg in m for m in narrow_warnings), f"Did not expect outside-range warning, got {narrow_warnings}"
# --- Test 2: wide window (>1 h, triggers multiple exponential buckets) ---
# should just return 1 span, not duplicate
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_rows = _query(wide_start_ms, now_ms)
wide_rows, wide_warnings = _query(wide_start_ms, now_ms)
assert len(wide_rows) == 1, f"Expected 1 span for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-span regression"
assert wide_rows[0]["data"]["span_id"] == span_id_root
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
# --- Test 3: window that does not contain the trace returns no results ---
# --- Test 3: window that does not contain the trace returns no results + warning ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_rows = _query(past_start_ms, past_end_ms)
past_rows, past_warnings = _query(past_start_ms, past_end_ms)
assert len(past_rows) == 0, f"Expected 0 spans for trace_id filter outside time window, got {len(past_rows)}"
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
def test_traces_aggregation_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that the trace_id time-range optimization also applies to
non-window-list (time_series / aggregation) traces queries:
1. Wide query window containing the trace returns the correct count.
2. Query window outside the trace's time range short-circuits to empty.
3. Filter referencing a trace_id with no row in trace_summary
short-circuits to empty (trace_summary is authoritative for traces).
"""
target_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
missing_trace_id = TraceIdGenerator.trace_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "traces-agg-filter-service",
"cloud.provider": "integration",
}
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=5),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={"http.request.method": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _count(start_ms: int, end_ms: int, trace_id: str) -> tuple[float, list[str]]:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="time_series",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"aggregations": [{"expression": "count()"}],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
warning = (response.json().get("data") or {}).get("warning") or {}
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
aggregations = results[0].get("aggregations") or []
if not aggregations:
return 0, messages
series = aggregations[0].get("series") or []
if not series:
return 0, messages
return sum(v["value"] for v in series[0]["values"]), messages
outside_range_msg = "lies outside the selected time range"
now_ms = int(now.timestamp() * 1000)
# --- Test 1: wide window (>1 h) containing the trace returns both spans ---
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_count, wide_warnings = _count(wide_start_ms, now_ms, target_trace_id)
assert wide_count == 2, f"Expected count=2 for trace_id aggregation (wide window), got {wide_count}"
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
# --- Test 2: window outside the trace short-circuits to empty + warning ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_count, past_warnings = _count(past_start_ms, past_end_ms, target_trace_id)
assert past_count == 0, f"Expected count=0 for trace_id aggregation outside time window, got {past_count}"
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
# --- Test 3: trace_id with no entry in trace_summary short-circuits (no warning) ---
missing_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
missing_count, missing_warnings = _count(missing_start_ms, now_ms, missing_trace_id)
assert missing_count == 0, f"Expected count=0 for trace_id absent from trace_summary, got {missing_count}"
assert not any(outside_range_msg in m for m in missing_warnings), f"Did not expect outside-range warning for missing trace_id, got {missing_warnings}"