mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-17 18:32:11 +00:00
Compare commits
31 Commits
feat/trace
...
nv/6204
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28e0f2f7ad | ||
|
|
cd458f0205 | ||
|
|
ee2916e6c6 | ||
|
|
1c1d069263 | ||
|
|
7dc46db2e3 | ||
|
|
bfcd423a45 | ||
|
|
323b1163e5 | ||
|
|
673379a46c | ||
|
|
37f490c705 | ||
|
|
324e34092e | ||
|
|
2a2c365950 | ||
|
|
14065d39a6 | ||
|
|
61df12d126 | ||
|
|
b846faa1fa | ||
|
|
557451ed81 | ||
|
|
25c513ec2f | ||
|
|
ae71f2608a | ||
|
|
bf2133f1ab | ||
|
|
d7d907f687 | ||
|
|
76b4549504 | ||
|
|
968a5089ff | ||
|
|
c082bc3d76 | ||
|
|
59e0dcc865 | ||
|
|
89840189ef | ||
|
|
b64a07db02 | ||
|
|
38d971b3c9 | ||
|
|
f8b266ce05 | ||
|
|
20f7562cbc | ||
|
|
29713964ce | ||
|
|
afb252b4f9 | ||
|
|
c808b4d759 |
@@ -1,4 +1,22 @@
|
||||
services:
|
||||
init-clickhouse:
|
||||
image: clickhouse/clickhouse-server:25.5.6
|
||||
container_name: init-clickhouse
|
||||
command:
|
||||
- bash
|
||||
- -c
|
||||
- |
|
||||
version="v0.0.1"
|
||||
node_os=$$(uname -s | tr '[:upper:]' '[:lower:]')
|
||||
node_arch=$$(uname -m | sed s/aarch64/arm64/ | sed s/x86_64/amd64/)
|
||||
echo "Fetching histogram-binary for $${node_os}/$${node_arch}"
|
||||
cd /tmp
|
||||
wget -O histogram-quantile.tar.gz "https://github.com/SigNoz/signoz/releases/download/histogram-quantile%2F$${version}/histogram-quantile_$${node_os}_$${node_arch}.tar.gz"
|
||||
tar -xvzf histogram-quantile.tar.gz
|
||||
mv histogram-quantile /var/lib/clickhouse/user_scripts/histogramQuantile
|
||||
restart: on-failure
|
||||
volumes:
|
||||
- ${PWD}/fs/tmp/var/lib/clickhouse/user_scripts/:/var/lib/clickhouse/user_scripts/
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server:25.5.6
|
||||
container_name: clickhouse
|
||||
@@ -7,6 +25,7 @@ services:
|
||||
- ${PWD}/fs/etc/clickhouse-server/users.d/users.xml:/etc/clickhouse-server/users.d/users.xml
|
||||
- ${PWD}/fs/tmp/var/lib/clickhouse/:/var/lib/clickhouse/
|
||||
- ${PWD}/fs/tmp/var/lib/clickhouse/user_scripts/:/var/lib/clickhouse/user_scripts/
|
||||
- ${PWD}/../../../deploy/common/clickhouse/custom-function.xml:/etc/clickhouse-server/custom-function.xml
|
||||
ports:
|
||||
- '127.0.0.1:8123:8123'
|
||||
- '127.0.0.1:9000:9000'
|
||||
@@ -22,7 +41,10 @@ services:
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
depends_on:
|
||||
- zookeeper
|
||||
init-clickhouse:
|
||||
condition: service_completed_successfully
|
||||
zookeeper:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
- CLICKHOUSE_SKIP_USER_SETUP=1
|
||||
zookeeper:
|
||||
|
||||
@@ -44,4 +44,6 @@
|
||||
<shard>01</shard>
|
||||
<replica>01</replica>
|
||||
</macros>
|
||||
<user_defined_executable_functions_config>*function.xml</user_defined_executable_functions_config>
|
||||
<user_scripts_path>/var/lib/clickhouse/user_scripts/</user_scripts_path>
|
||||
</clickhouse>
|
||||
@@ -190,7 +190,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.114.1
|
||||
image: signoz/signoz:v0.115.0
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
# - "6060:6060" # pprof port
|
||||
|
||||
@@ -117,7 +117,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.114.1
|
||||
image: signoz/signoz:v0.115.0
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
volumes:
|
||||
|
||||
@@ -181,7 +181,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.114.1}
|
||||
image: signoz/signoz:${VERSION:-v0.115.0}
|
||||
container_name: signoz
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
|
||||
@@ -109,7 +109,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.114.1}
|
||||
image: signoz/signoz:${VERSION:-v0.115.0}
|
||||
container_name: signoz
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { ReactChild, useCallback, useEffect, useMemo, useState } from 'react';
|
||||
import { useQuery } from 'react-query';
|
||||
import { matchPath, useLocation } from 'react-router-dom';
|
||||
import { matchPath, Redirect, useLocation } from 'react-router-dom';
|
||||
import getLocalStorageApi from 'api/browser/localstorage/get';
|
||||
import setLocalStorageApi from 'api/browser/localstorage/set';
|
||||
import getAll from 'api/v1/user/get';
|
||||
@@ -236,13 +236,7 @@ function PrivateRoute({ children }: PrivateRouteProps): JSX.Element {
|
||||
useEffect(() => {
|
||||
// if it is an old route navigate to the new route
|
||||
if (isOldRoute) {
|
||||
const redirectUrl = oldNewRoutesMapping[pathname];
|
||||
|
||||
const newLocation = {
|
||||
...location,
|
||||
pathname: redirectUrl,
|
||||
};
|
||||
history.replace(newLocation);
|
||||
// this will be handled by the redirect component below
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -296,6 +290,19 @@ function PrivateRoute({ children }: PrivateRouteProps): JSX.Element {
|
||||
}
|
||||
}, [isLoggedInState, pathname, user, isOldRoute, currentRoute, location]);
|
||||
|
||||
if (isOldRoute) {
|
||||
const redirectUrl = oldNewRoutesMapping[pathname];
|
||||
return (
|
||||
<Redirect
|
||||
to={{
|
||||
pathname: redirectUrl,
|
||||
search: location.search,
|
||||
hash: location.hash,
|
||||
}}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
// NOTE: disabling this rule as there is no need to have div
|
||||
return <>{children}</>;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,31 @@
|
||||
.custom-time-picker {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
flex-direction: row;
|
||||
align-items: center;
|
||||
gap: 4px;
|
||||
|
||||
.zoom-out-btn {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
flex-shrink: 0;
|
||||
color: var(--foreground);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: 2px;
|
||||
box-shadow: none;
|
||||
padding: 10px;
|
||||
height: 33px;
|
||||
|
||||
&:hover:not(:disabled) {
|
||||
color: var(--bg-vanilla-100);
|
||||
background: var(--primary);
|
||||
}
|
||||
|
||||
&:disabled {
|
||||
opacity: 0.5;
|
||||
cursor: not-allowed;
|
||||
}
|
||||
}
|
||||
|
||||
.timeSelection-input {
|
||||
&:hover {
|
||||
|
||||
@@ -16,6 +16,15 @@ jest.mock('react-router-dom', () => {
|
||||
};
|
||||
});
|
||||
|
||||
jest.mock('react-redux', () => ({
|
||||
...jest.requireActual('react-redux'),
|
||||
useDispatch: jest.fn(() => jest.fn()),
|
||||
useSelector: jest.fn(() => ({
|
||||
minTime: 0,
|
||||
maxTime: Date.now(),
|
||||
})),
|
||||
}));
|
||||
|
||||
jest.mock('providers/Timezone', () => {
|
||||
const actual = jest.requireActual('providers/Timezone');
|
||||
|
||||
|
||||
@@ -7,9 +7,11 @@ import {
|
||||
useState,
|
||||
} from 'react';
|
||||
import { useLocation } from 'react-router-dom';
|
||||
import { Button } from '@signozhq/button';
|
||||
import { Input, InputRef, Popover, Tooltip } from 'antd';
|
||||
import cx from 'classnames';
|
||||
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
|
||||
import { QueryParams } from 'constants/query';
|
||||
import { DateTimeRangeType } from 'container/TopNav/CustomDateTimeModal';
|
||||
import {
|
||||
FixedDurationSuggestionOptions,
|
||||
@@ -17,9 +19,11 @@ import {
|
||||
RelativeDurationSuggestionOptions,
|
||||
} from 'container/TopNav/DateTimeSelectionV2/constants';
|
||||
import dayjs from 'dayjs';
|
||||
import { useZoomOut } from 'hooks/useZoomOut';
|
||||
import { isValidShortHandDateTimeFormat } from 'lib/getMinMax';
|
||||
import { isZoomOutDisabled } from 'lib/zoomOutUtils';
|
||||
import { defaultTo, isFunction, noop } from 'lodash-es';
|
||||
import { ChevronDown, ChevronUp } from 'lucide-react';
|
||||
import { ChevronDown, ChevronUp, ZoomOut } from 'lucide-react';
|
||||
import { useTimezone } from 'providers/Timezone';
|
||||
import { getTimeDifference, validateEpochRange } from 'utils/epochUtils';
|
||||
import { popupContainer } from 'utils/selectPopupContainer';
|
||||
@@ -66,6 +70,8 @@ interface CustomTimePickerProps {
|
||||
showRecentlyUsed?: boolean;
|
||||
minTime: number;
|
||||
maxTime: number;
|
||||
/** When true, zoom-out button is hidden (e.g. in drawer/modal time selection) */
|
||||
isModalTimeSelection?: boolean;
|
||||
}
|
||||
|
||||
function CustomTimePicker({
|
||||
@@ -88,6 +94,7 @@ function CustomTimePicker({
|
||||
showRecentlyUsed = true,
|
||||
minTime,
|
||||
maxTime,
|
||||
isModalTimeSelection = false,
|
||||
}: CustomTimePickerProps): JSX.Element {
|
||||
const [
|
||||
selectedTimePlaceholderValue,
|
||||
@@ -116,6 +123,14 @@ function CustomTimePicker({
|
||||
|
||||
const [isOpenedFromFooter, setIsOpenedFromFooter] = useState(false);
|
||||
|
||||
const durationMs = (maxTime - minTime) / 1e6;
|
||||
const zoomOutDisabled = showLiveLogs || isZoomOutDisabled(durationMs);
|
||||
|
||||
const handleZoomOut = useZoomOut({
|
||||
isDisabled: zoomOutDisabled,
|
||||
urlParamsToDelete: [QueryParams.activeLogId],
|
||||
});
|
||||
|
||||
// function to get selected time in Last 1m, Last 2h, Last 3d, Last 4w format
|
||||
// 1m, 2h, 3d, 4w -> Last 1 minute, Last 2 hours, Last 3 days, Last 4 weeks
|
||||
const getSelectedTimeRangeLabelInRelativeFormat = (
|
||||
@@ -631,6 +646,23 @@ function CustomTimePicker({
|
||||
/>
|
||||
</Popover>
|
||||
</Tooltip>
|
||||
{!showLiveLogs && !isModalTimeSelection && (
|
||||
<Tooltip
|
||||
title={
|
||||
zoomOutDisabled ? 'Zoom out time range is limited to 1 month' : 'Zoom out'
|
||||
}
|
||||
>
|
||||
<span>
|
||||
<Button
|
||||
className="zoom-out-btn"
|
||||
onClick={handleZoomOut}
|
||||
disabled={zoomOutDisabled}
|
||||
data-testid="zoom-out-btn"
|
||||
prefixIcon={<ZoomOut size={14} />}
|
||||
/>
|
||||
</span>
|
||||
</Tooltip>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,169 @@
|
||||
import { render, screen } from '@testing-library/react';
|
||||
import userEvent from '@testing-library/user-event';
|
||||
import { QueryParams } from 'constants/query';
|
||||
import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import CustomTimePicker from '../CustomTimePicker';
|
||||
|
||||
const MS_PER_MIN = 60 * 1000;
|
||||
const NOW_MS = 1705312800000;
|
||||
|
||||
const mockDispatch = jest.fn();
|
||||
const mockSafeNavigate = jest.fn();
|
||||
const mockUrlQueryDelete = jest.fn();
|
||||
const mockUrlQuerySet = jest.fn();
|
||||
|
||||
interface MockAppState {
|
||||
globalTime: Pick<GlobalReducer, 'minTime' | 'maxTime'>;
|
||||
}
|
||||
|
||||
jest.mock('react-redux', () => ({
|
||||
useDispatch: (): jest.Mock => mockDispatch,
|
||||
useSelector: (selector: (state: MockAppState) => unknown): unknown => {
|
||||
const mockState: MockAppState = {
|
||||
globalTime: {
|
||||
minTime: (NOW_MS - 15 * MS_PER_MIN) * 1e6,
|
||||
maxTime: NOW_MS * 1e6,
|
||||
},
|
||||
};
|
||||
return selector(mockState);
|
||||
},
|
||||
}));
|
||||
|
||||
jest.mock('hooks/useSafeNavigate', () => ({
|
||||
useSafeNavigate: (): { safeNavigate: jest.Mock } => ({
|
||||
safeNavigate: mockSafeNavigate,
|
||||
}),
|
||||
}));
|
||||
|
||||
interface MockUrlQuery {
|
||||
delete: typeof mockUrlQueryDelete;
|
||||
set: typeof mockUrlQuerySet;
|
||||
get: () => null;
|
||||
toString: () => string;
|
||||
}
|
||||
|
||||
jest.mock('hooks/useUrlQuery', () => ({
|
||||
__esModule: true,
|
||||
default: (): MockUrlQuery => ({
|
||||
delete: mockUrlQueryDelete,
|
||||
set: mockUrlQuerySet,
|
||||
get: (): null => null,
|
||||
toString: (): string => 'relativeTime=45m',
|
||||
}),
|
||||
}));
|
||||
|
||||
jest.mock('providers/Timezone', () => ({
|
||||
useTimezone: (): { timezone: { value: string; offset: string } } => ({
|
||||
timezone: { value: 'UTC', offset: 'UTC' },
|
||||
}),
|
||||
}));
|
||||
|
||||
jest.mock('react-router-dom', () => ({
|
||||
useLocation: (): { pathname: string } => ({ pathname: '/logs-explorer' }),
|
||||
}));
|
||||
|
||||
const MS_PER_DAY = 24 * 60 * 60 * 1000;
|
||||
const now = Date.now();
|
||||
const defaultProps = {
|
||||
onSelect: jest.fn(),
|
||||
onError: jest.fn(),
|
||||
selectedValue: '15m',
|
||||
selectedTime: '15m',
|
||||
onValidCustomDateChange: jest.fn(),
|
||||
open: false,
|
||||
setOpen: jest.fn(),
|
||||
items: [
|
||||
{ value: '15m', label: 'Last 15 minutes' },
|
||||
{ value: '1h', label: 'Last 1 hour' },
|
||||
],
|
||||
minTime: (now - 15 * 60 * 1000) * 1e6,
|
||||
maxTime: now * 1e6,
|
||||
};
|
||||
|
||||
describe('CustomTimePicker - zoom out button', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.spyOn(Date, 'now').mockReturnValue(NOW_MS);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
it('should render zoom out button when showLiveLogs is false', () => {
|
||||
render(<CustomTimePicker {...defaultProps} showLiveLogs={false} />);
|
||||
|
||||
expect(screen.getByTestId('zoom-out-btn')).toBeInTheDocument();
|
||||
});
|
||||
|
||||
it('should not render zoom out button when showLiveLogs is true', () => {
|
||||
render(<CustomTimePicker {...defaultProps} showLiveLogs={true} />);
|
||||
|
||||
expect(screen.queryByTestId('zoom-out-btn')).not.toBeInTheDocument();
|
||||
});
|
||||
|
||||
it('should not render zoom out button when isModalTimeSelection is true', () => {
|
||||
render(
|
||||
<CustomTimePicker
|
||||
{...defaultProps}
|
||||
showLiveLogs={false}
|
||||
isModalTimeSelection={true}
|
||||
/>,
|
||||
);
|
||||
|
||||
expect(screen.queryByTestId('zoom-out-btn')).not.toBeInTheDocument();
|
||||
});
|
||||
|
||||
it('should call handleZoomOut when zoom out button is clicked', async () => {
|
||||
render(<CustomTimePicker {...defaultProps} showLiveLogs={false} />);
|
||||
|
||||
const zoomOutBtn = screen.getByTestId('zoom-out-btn');
|
||||
await userEvent.click(zoomOutBtn);
|
||||
|
||||
expect(mockDispatch).toHaveBeenCalled();
|
||||
expect(mockUrlQuerySet).toHaveBeenCalledWith(QueryParams.relativeTime, '45m');
|
||||
expect(mockSafeNavigate).toHaveBeenCalledWith(
|
||||
expect.stringMatching(/\/logs-explorer\?relativeTime=45m/),
|
||||
);
|
||||
});
|
||||
|
||||
it('should use real ladder logic: 15m range zooms to 45m preset and updates URL', async () => {
|
||||
render(<CustomTimePicker {...defaultProps} showLiveLogs={false} />);
|
||||
|
||||
const zoomOutBtn = screen.getByTestId('zoom-out-btn');
|
||||
await userEvent.click(zoomOutBtn);
|
||||
|
||||
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.startTime);
|
||||
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.endTime);
|
||||
expect(mockUrlQuerySet).toHaveBeenCalledWith(QueryParams.relativeTime, '45m');
|
||||
expect(mockSafeNavigate).toHaveBeenCalledWith(
|
||||
expect.stringMatching(/\/logs-explorer\?relativeTime=45m/),
|
||||
);
|
||||
expect(mockDispatch).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should delete activeLogId when zoom out is clicked', async () => {
|
||||
render(<CustomTimePicker {...defaultProps} showLiveLogs={false} />);
|
||||
|
||||
const zoomOutBtn = screen.getByTestId('zoom-out-btn');
|
||||
await userEvent.click(zoomOutBtn);
|
||||
|
||||
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.activeLogId);
|
||||
});
|
||||
|
||||
it('should disable zoom button when time range is >= 1 month', () => {
|
||||
const now = Date.now();
|
||||
render(
|
||||
<CustomTimePicker
|
||||
{...defaultProps}
|
||||
minTime={(now - 31 * MS_PER_DAY) * 1e6}
|
||||
maxTime={now * 1e6}
|
||||
showLiveLogs={false}
|
||||
/>,
|
||||
);
|
||||
|
||||
const zoomOutBtn = screen.getByTestId('zoom-out-btn');
|
||||
expect(zoomOutBtn).toBeDisabled();
|
||||
});
|
||||
});
|
||||
@@ -4,8 +4,8 @@ import { getColorsForSeverityLabels, isRedLike } from '../utils';
|
||||
|
||||
describe('getColorsForSeverityLabels', () => {
|
||||
it('should return slate for blank labels', () => {
|
||||
expect(getColorsForSeverityLabels('', 0)).toBe(Color.BG_SLATE_300);
|
||||
expect(getColorsForSeverityLabels(' ', 0)).toBe(Color.BG_SLATE_300);
|
||||
expect(getColorsForSeverityLabels('', 0)).toBe(Color.BG_VANILLA_400);
|
||||
expect(getColorsForSeverityLabels(' ', 0)).toBe(Color.BG_VANILLA_400);
|
||||
});
|
||||
|
||||
it('should return correct colors for known severity variants', () => {
|
||||
|
||||
@@ -79,7 +79,7 @@ export function getColorsForSeverityLabels(
|
||||
const trimmed = label.trim();
|
||||
|
||||
if (!trimmed) {
|
||||
return Color.BG_SLATE_300;
|
||||
return Color.BG_VANILLA_400; // Default color for empty labels
|
||||
}
|
||||
|
||||
const variantColor = SEVERITY_VARIANT_COLORS[trimmed];
|
||||
@@ -119,6 +119,6 @@ export function getColorsForSeverityLabels(
|
||||
|
||||
return (
|
||||
SAFE_FALLBACK_COLORS[index % SAFE_FALLBACK_COLORS.length] ||
|
||||
Color.BG_SLATE_400
|
||||
Color.BG_VANILLA_400
|
||||
);
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ import { AppState } from 'store/reducers';
|
||||
import AppActions from 'types/actions';
|
||||
import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
import { addCustomTimeRange } from 'utils/customTimeRangeUtils';
|
||||
import { persistTimeDurationForRoute } from 'utils/metricsTimeStorageUtils';
|
||||
import { normalizeTimeToMs } from 'utils/timeUtils';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
@@ -234,20 +235,7 @@ function DateTimeSelection({
|
||||
|
||||
const updateLocalStorageForRoutes = useCallback(
|
||||
(value: Time | string): void => {
|
||||
const preRoutes = getLocalStorageKey(LOCALSTORAGE.METRICS_TIME_IN_DURATION);
|
||||
if (preRoutes !== null) {
|
||||
const preRoutesObject = JSON.parse(preRoutes);
|
||||
|
||||
const preRoute = {
|
||||
...preRoutesObject,
|
||||
};
|
||||
preRoute[location.pathname] = value;
|
||||
|
||||
setLocalStorageKey(
|
||||
LOCALSTORAGE.METRICS_TIME_IN_DURATION,
|
||||
JSON.stringify(preRoute),
|
||||
);
|
||||
}
|
||||
persistTimeDurationForRoute(location.pathname, String(value));
|
||||
},
|
||||
[location.pathname],
|
||||
);
|
||||
@@ -738,6 +726,7 @@ function DateTimeSelection({
|
||||
showRecentlyUsed={showRecentlyUsed}
|
||||
minTime={minTimeForDateTimePicker}
|
||||
maxTime={maxTimeForDateTimePicker}
|
||||
isModalTimeSelection={isModalTimeSelection}
|
||||
/>
|
||||
|
||||
{showAutoRefresh && selectedTime !== 'custom' && (
|
||||
|
||||
160
frontend/src/hooks/__tests__/useZoomOut.test.ts
Normal file
160
frontend/src/hooks/__tests__/useZoomOut.test.ts
Normal file
@@ -0,0 +1,160 @@
|
||||
import { act, renderHook } from '@testing-library/react';
|
||||
import { QueryParams } from 'constants/query';
|
||||
import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
import { useZoomOut } from '../useZoomOut';
|
||||
|
||||
const mockDispatch = jest.fn();
|
||||
const mockSafeNavigate = jest.fn();
|
||||
const mockUrlQueryDelete = jest.fn();
|
||||
const mockUrlQuerySet = jest.fn();
|
||||
const mockUrlQueryToString = jest.fn(() => '');
|
||||
|
||||
interface MockAppState {
|
||||
globalTime: Pick<GlobalReducer, 'minTime' | 'maxTime'>;
|
||||
}
|
||||
|
||||
jest.mock('react-redux', () => ({
|
||||
useDispatch: (): jest.Mock => mockDispatch,
|
||||
useSelector: <T>(selector: (state: MockAppState) => T): T => {
|
||||
const mockState: MockAppState = {
|
||||
globalTime: {
|
||||
minTime: 15 * 60 * 1000 * 1e6, // 15 min in nanoseconds
|
||||
maxTime: 30 * 60 * 1000 * 1e6, // 30 min in nanoseconds (mock for getNextZoomOutRange)
|
||||
},
|
||||
};
|
||||
return selector(mockState);
|
||||
},
|
||||
}));
|
||||
|
||||
jest.mock('react-router-dom', () => ({
|
||||
useLocation: (): { pathname: string } => ({ pathname: '/logs-explorer' }),
|
||||
}));
|
||||
|
||||
jest.mock('hooks/useSafeNavigate', () => ({
|
||||
useSafeNavigate: (): { safeNavigate: typeof mockSafeNavigate } => ({
|
||||
safeNavigate: mockSafeNavigate,
|
||||
}),
|
||||
}));
|
||||
|
||||
interface MockUrlQuery {
|
||||
delete: typeof mockUrlQueryDelete;
|
||||
set: typeof mockUrlQuerySet;
|
||||
get: () => null;
|
||||
toString: typeof mockUrlQueryToString;
|
||||
}
|
||||
|
||||
jest.mock('hooks/useUrlQuery', () => ({
|
||||
__esModule: true,
|
||||
default: (): MockUrlQuery => ({
|
||||
delete: mockUrlQueryDelete,
|
||||
set: mockUrlQuerySet,
|
||||
get: (): null => null,
|
||||
toString: mockUrlQueryToString,
|
||||
}),
|
||||
}));
|
||||
|
||||
const mockGetNextZoomOutRange = jest.fn();
|
||||
jest.mock('lib/zoomOutUtils', () => ({
|
||||
getNextZoomOutRange: (
|
||||
...args: unknown[]
|
||||
): ReturnType<typeof mockGetNextZoomOutRange> =>
|
||||
mockGetNextZoomOutRange(...args),
|
||||
}));
|
||||
|
||||
describe('useZoomOut', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
mockUrlQueryToString.mockReturnValue('relativeTime=45m');
|
||||
});
|
||||
|
||||
it('should do nothing when isDisabled is true', () => {
|
||||
const { result } = renderHook(() => useZoomOut({ isDisabled: true }));
|
||||
|
||||
act(() => {
|
||||
result.current();
|
||||
});
|
||||
|
||||
expect(mockGetNextZoomOutRange).not.toHaveBeenCalled();
|
||||
expect(mockDispatch).not.toHaveBeenCalled();
|
||||
expect(mockSafeNavigate).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should do nothing when getNextZoomOutRange returns null', () => {
|
||||
mockGetNextZoomOutRange.mockReturnValue(null);
|
||||
|
||||
const { result } = renderHook(() => useZoomOut());
|
||||
|
||||
act(() => {
|
||||
result.current();
|
||||
});
|
||||
|
||||
expect(mockGetNextZoomOutRange).toHaveBeenCalled();
|
||||
expect(mockDispatch).not.toHaveBeenCalled();
|
||||
expect(mockSafeNavigate).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should dispatch preset and update URL when result has preset', () => {
|
||||
mockGetNextZoomOutRange.mockReturnValue({
|
||||
range: [1000, 2000],
|
||||
preset: '45m',
|
||||
});
|
||||
|
||||
const { result } = renderHook(() => useZoomOut());
|
||||
|
||||
act(() => {
|
||||
result.current();
|
||||
});
|
||||
|
||||
expect(mockDispatch).toHaveBeenCalledWith(expect.any(Function));
|
||||
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.startTime);
|
||||
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.endTime);
|
||||
expect(mockUrlQuerySet).toHaveBeenCalledWith(QueryParams.relativeTime, '45m');
|
||||
expect(mockSafeNavigate).toHaveBeenCalledWith(
|
||||
expect.stringContaining('/logs-explorer'),
|
||||
);
|
||||
});
|
||||
|
||||
it('should dispatch custom range and update URL when result has no preset', () => {
|
||||
mockGetNextZoomOutRange.mockReturnValue({
|
||||
range: [1000000, 2000000],
|
||||
preset: null,
|
||||
});
|
||||
|
||||
const { result } = renderHook(() => useZoomOut());
|
||||
|
||||
act(() => {
|
||||
result.current();
|
||||
});
|
||||
|
||||
expect(mockDispatch).toHaveBeenCalledWith(expect.any(Function));
|
||||
expect(mockUrlQuerySet).toHaveBeenCalledWith(
|
||||
QueryParams.startTime,
|
||||
'1000000',
|
||||
);
|
||||
expect(mockUrlQuerySet).toHaveBeenCalledWith(QueryParams.endTime, '2000000');
|
||||
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.relativeTime);
|
||||
expect(mockSafeNavigate).toHaveBeenCalledWith(
|
||||
expect.stringContaining('/logs-explorer'),
|
||||
);
|
||||
});
|
||||
|
||||
it('should delete urlParamsToDelete when provided', () => {
|
||||
mockGetNextZoomOutRange.mockReturnValue({
|
||||
range: [1000, 2000],
|
||||
preset: '45m',
|
||||
});
|
||||
|
||||
const { result } = renderHook(() =>
|
||||
useZoomOut({
|
||||
urlParamsToDelete: [QueryParams.activeLogId],
|
||||
}),
|
||||
);
|
||||
|
||||
act(() => {
|
||||
result.current();
|
||||
});
|
||||
|
||||
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.activeLogId);
|
||||
});
|
||||
});
|
||||
79
frontend/src/hooks/useZoomOut.ts
Normal file
79
frontend/src/hooks/useZoomOut.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { useCallback, useRef } from 'react';
|
||||
// eslint-disable-next-line no-restricted-imports
|
||||
import { useDispatch, useSelector } from 'react-redux';
|
||||
import { useLocation } from 'react-router-dom';
|
||||
import { QueryParams } from 'constants/query';
|
||||
import { useSafeNavigate } from 'hooks/useSafeNavigate';
|
||||
import useUrlQuery from 'hooks/useUrlQuery';
|
||||
import { getNextZoomOutRange } from 'lib/zoomOutUtils';
|
||||
import { UpdateTimeInterval } from 'store/actions';
|
||||
import { AppState } from 'store/reducers';
|
||||
import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
import { persistTimeDurationForRoute } from 'utils/metricsTimeStorageUtils';
|
||||
|
||||
export interface UseZoomOutOptions {
|
||||
/** When true, the zoom out handler does nothing (e.g. when live logs are enabled) */
|
||||
isDisabled?: boolean;
|
||||
/** URL params to delete when zooming out (e.g. [QueryParams.activeLogId] for logs) */
|
||||
urlParamsToDelete?: string[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Reusable hook for zoom-out functionality in explorers (logs, traces, etc.).
|
||||
* Computes the next time range using the zoom-out ladder, updates Redux global time,
|
||||
* and navigates with the new URL params.
|
||||
*/
|
||||
const EMPTY_PARAMS: string[] = [];
|
||||
|
||||
export function useZoomOut(options: UseZoomOutOptions = {}): () => void {
|
||||
const { isDisabled = false, urlParamsToDelete = EMPTY_PARAMS } = options;
|
||||
const urlParamsToDeleteRef = useRef(urlParamsToDelete);
|
||||
urlParamsToDeleteRef.current = urlParamsToDelete;
|
||||
|
||||
const dispatch = useDispatch();
|
||||
const { minTime, maxTime } = useSelector<AppState, GlobalReducer>(
|
||||
(state) => state.globalTime,
|
||||
);
|
||||
const urlQuery = useUrlQuery();
|
||||
const location = useLocation();
|
||||
const { safeNavigate } = useSafeNavigate();
|
||||
|
||||
return useCallback((): void => {
|
||||
if (isDisabled) {
|
||||
return;
|
||||
}
|
||||
const minMs = Math.floor((minTime ?? 0) / 1e6);
|
||||
const maxMs = Math.floor((maxTime ?? 0) / 1e6);
|
||||
const result = getNextZoomOutRange(minMs, maxMs);
|
||||
if (!result) {
|
||||
return;
|
||||
}
|
||||
const [newStartMs, newEndMs] = result.range;
|
||||
const { preset } = result;
|
||||
|
||||
if (preset) {
|
||||
dispatch(UpdateTimeInterval(preset));
|
||||
urlQuery.delete(QueryParams.startTime);
|
||||
urlQuery.delete(QueryParams.endTime);
|
||||
urlQuery.set(QueryParams.relativeTime, preset);
|
||||
persistTimeDurationForRoute(location.pathname, preset);
|
||||
} else {
|
||||
dispatch(UpdateTimeInterval('custom', [newStartMs, newEndMs]));
|
||||
urlQuery.set(QueryParams.startTime, String(newStartMs));
|
||||
urlQuery.set(QueryParams.endTime, String(newEndMs));
|
||||
urlQuery.delete(QueryParams.relativeTime);
|
||||
}
|
||||
for (const param of urlParamsToDeleteRef.current) {
|
||||
urlQuery.delete(param);
|
||||
}
|
||||
safeNavigate(`${location.pathname}?${urlQuery.toString()}`);
|
||||
}, [
|
||||
dispatch,
|
||||
isDisabled,
|
||||
location.pathname,
|
||||
maxTime,
|
||||
minTime,
|
||||
safeNavigate,
|
||||
urlQuery,
|
||||
]);
|
||||
}
|
||||
147
frontend/src/lib/__tests__/zoomOutUtils.test.ts
Normal file
147
frontend/src/lib/__tests__/zoomOutUtils.test.ts
Normal file
@@ -0,0 +1,147 @@
|
||||
import {
|
||||
getNextDurationInLadder,
|
||||
getNextZoomOutRange,
|
||||
isZoomOutDisabled,
|
||||
ZoomOutResult,
|
||||
} from '../zoomOutUtils';
|
||||
|
||||
const MS_PER_MIN = 60 * 1000;
|
||||
const MS_PER_HOUR = 60 * MS_PER_MIN;
|
||||
const MS_PER_DAY = 24 * MS_PER_HOUR;
|
||||
const MS_PER_WEEK = 7 * MS_PER_DAY;
|
||||
|
||||
// Fixed "now" for deterministic tests: 2024-01-15 12:00:00 UTC
|
||||
const NOW_MS = 1705312800000;
|
||||
|
||||
describe('zoomOutUtils', () => {
|
||||
beforeEach(() => {
|
||||
jest.spyOn(Date, 'now').mockReturnValue(NOW_MS);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe('getNextDurationInLadder', () => {
|
||||
it('should use 3x zoom out below 15m until reaching 15m', () => {
|
||||
expect(getNextDurationInLadder(1 * MS_PER_MIN)).toBe(3 * MS_PER_MIN);
|
||||
expect(getNextDurationInLadder(2 * MS_PER_MIN)).toBe(6 * MS_PER_MIN);
|
||||
expect(getNextDurationInLadder(3 * MS_PER_MIN)).toBe(9 * MS_PER_MIN);
|
||||
expect(getNextDurationInLadder(4 * MS_PER_MIN)).toBe(12 * MS_PER_MIN);
|
||||
expect(getNextDurationInLadder(5 * MS_PER_MIN)).toBe(15 * MS_PER_MIN); // cap at 15m
|
||||
expect(getNextDurationInLadder(6 * MS_PER_MIN)).toBe(15 * MS_PER_MIN); // 18m capped
|
||||
});
|
||||
|
||||
it('should return next step for each ladder rung from 15m onward', () => {
|
||||
expect(getNextDurationInLadder(10 * MS_PER_MIN)).toBe(15 * MS_PER_MIN);
|
||||
expect(getNextDurationInLadder(15 * MS_PER_MIN)).toBe(45 * MS_PER_MIN);
|
||||
expect(getNextDurationInLadder(45 * MS_PER_MIN)).toBe(2 * MS_PER_HOUR);
|
||||
expect(getNextDurationInLadder(2 * MS_PER_HOUR)).toBe(7 * MS_PER_HOUR);
|
||||
expect(getNextDurationInLadder(7 * MS_PER_HOUR)).toBe(21 * MS_PER_HOUR);
|
||||
expect(getNextDurationInLadder(21 * MS_PER_HOUR)).toBe(1 * MS_PER_DAY);
|
||||
expect(getNextDurationInLadder(1 * MS_PER_DAY)).toBe(2 * MS_PER_DAY);
|
||||
expect(getNextDurationInLadder(2 * MS_PER_DAY)).toBe(3 * MS_PER_DAY);
|
||||
expect(getNextDurationInLadder(3 * MS_PER_DAY)).toBe(1 * MS_PER_WEEK);
|
||||
expect(getNextDurationInLadder(1 * MS_PER_WEEK)).toBe(2 * MS_PER_WEEK);
|
||||
expect(getNextDurationInLadder(2 * MS_PER_WEEK)).toBe(30 * MS_PER_DAY);
|
||||
});
|
||||
|
||||
it('should return MAX when at or past 1 month (no wrap)', () => {
|
||||
expect(getNextDurationInLadder(30 * MS_PER_DAY)).toBe(30 * MS_PER_DAY);
|
||||
expect(getNextDurationInLadder(31 * MS_PER_DAY)).toBe(30 * MS_PER_DAY);
|
||||
});
|
||||
|
||||
it('should return next step for duration between ladder rungs', () => {
|
||||
expect(getNextDurationInLadder(1 * MS_PER_HOUR)).toBe(2 * MS_PER_HOUR);
|
||||
expect(getNextDurationInLadder(5 * MS_PER_HOUR)).toBe(7 * MS_PER_HOUR);
|
||||
expect(getNextDurationInLadder(12 * MS_PER_HOUR)).toBe(21 * MS_PER_HOUR);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getNextZoomOutRange', () => {
|
||||
it('should return null when duration is zero or negative', () => {
|
||||
expect(getNextZoomOutRange(NOW_MS, NOW_MS)).toBeNull();
|
||||
expect(getNextZoomOutRange(NOW_MS, NOW_MS - 1000)).toBeNull();
|
||||
});
|
||||
|
||||
it('should return center-anchored range and preset=null when new end does not exceed now (Phase 1)', () => {
|
||||
// 15m range centered well before now so zoom to 45m keeps end <= now
|
||||
// Center at now-30m: end = center + 22.5m = now - 7.5m <= now
|
||||
const centerMs = NOW_MS - 30 * MS_PER_MIN;
|
||||
const start15m = centerMs - 7.5 * MS_PER_MIN;
|
||||
const end15m = centerMs + 7.5 * MS_PER_MIN;
|
||||
const result = getNextZoomOutRange(start15m, end15m) as ZoomOutResult;
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
expect(result.preset).toBeNull(); // Phase 1: preserve center-anchored range, avoid GetMinMax "last X from now"
|
||||
const [newStart, newEnd] = result.range;
|
||||
expect(newEnd - newStart).toBe(45 * MS_PER_MIN);
|
||||
const newCenter = (newStart + newEnd) / 2;
|
||||
expect(Math.abs(newCenter - centerMs)).toBeLessThan(2000);
|
||||
expect(newEnd).toBeLessThanOrEqual(NOW_MS + 1000);
|
||||
});
|
||||
|
||||
it('should return end-anchored range when new end would exceed now (Phase 2)', () => {
|
||||
// 22hr range ending at now - zoom to 1d (24hr) would push end past now
|
||||
// Next ladder step from 22hr is 1d
|
||||
const start22h = NOW_MS - 22 * MS_PER_HOUR;
|
||||
const end22h = NOW_MS;
|
||||
const result = getNextZoomOutRange(start22h, end22h) as ZoomOutResult;
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
expect(result.preset).toBe('1d');
|
||||
const [newStart, newEnd] = result.range;
|
||||
expect(newEnd).toBe(NOW_MS); // End anchored at now
|
||||
expect(newStart).toBe(NOW_MS - 1 * MS_PER_DAY);
|
||||
});
|
||||
|
||||
it('should return correct preset for each ladder step', () => {
|
||||
const presets: [number, number, string][] = [
|
||||
[15 * MS_PER_MIN, 0, '45m'],
|
||||
[45 * MS_PER_MIN, 0, '2h'],
|
||||
[2 * MS_PER_HOUR, 0, '7h'],
|
||||
[7 * MS_PER_HOUR, 0, '21h'],
|
||||
[21 * MS_PER_HOUR, 0, '1d'],
|
||||
[1 * MS_PER_DAY, 0, '2d'],
|
||||
[2 * MS_PER_DAY, 0, '3d'],
|
||||
[3 * MS_PER_DAY, 0, '1w'],
|
||||
[1 * MS_PER_WEEK, 0, '2w'],
|
||||
[2 * MS_PER_WEEK, 0, '1month'],
|
||||
];
|
||||
|
||||
presets.forEach(([durationMs, offset, expectedPreset]) => {
|
||||
const end = NOW_MS - offset;
|
||||
const start = end - durationMs;
|
||||
const result = getNextZoomOutRange(start, end);
|
||||
expect(result?.preset).toBe(expectedPreset);
|
||||
});
|
||||
});
|
||||
|
||||
it('isZoomOutDisabled returns true when duration >= 1 month', () => {
|
||||
expect(isZoomOutDisabled(30 * MS_PER_DAY)).toBe(true);
|
||||
expect(isZoomOutDisabled(31 * MS_PER_DAY)).toBe(true);
|
||||
expect(isZoomOutDisabled(29 * MS_PER_DAY)).toBe(false);
|
||||
expect(isZoomOutDisabled(15 * MS_PER_MIN)).toBe(false);
|
||||
});
|
||||
|
||||
it('should return null when at 1 month (no zoom out beyond max)', () => {
|
||||
const start1m = NOW_MS - 30 * MS_PER_DAY;
|
||||
const end1m = NOW_MS;
|
||||
const result = getNextZoomOutRange(start1m, end1m);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it('should zoom out 3x from 5m range to 15m then continue with ladder', () => {
|
||||
// 5m range ending at now → 3x = 15m
|
||||
const start5m = NOW_MS - 5 * MS_PER_MIN;
|
||||
const end5m = NOW_MS;
|
||||
const result = getNextZoomOutRange(start5m, end5m) as ZoomOutResult;
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
expect(result.preset).toBe('15m');
|
||||
const [newStart, newEnd] = result.range;
|
||||
expect(newEnd - newStart).toBe(15 * MS_PER_MIN);
|
||||
});
|
||||
});
|
||||
});
|
||||
139
frontend/src/lib/zoomOutUtils.ts
Normal file
139
frontend/src/lib/zoomOutUtils.ts
Normal file
@@ -0,0 +1,139 @@
|
||||
/**
|
||||
* Custom Time Picker zoom-out ladder:
|
||||
* - Until 1 day: 15m → 45m → 2hr → 7hr → 21hr
|
||||
* - Then fixed: 1d → 2d → 3d → 1w → 2w → 1m
|
||||
* - At 1 month: zoom out is disabled (max range)
|
||||
*/
|
||||
|
||||
import type {
|
||||
CustomTimeType,
|
||||
Time,
|
||||
} from 'container/TopNav/DateTimeSelectionV2/types';
|
||||
|
||||
const MS_PER_MIN = 60 * 1000;
|
||||
const MS_PER_HOUR = 60 * MS_PER_MIN;
|
||||
const MS_PER_DAY = 24 * MS_PER_HOUR;
|
||||
const MS_PER_WEEK = 7 * MS_PER_DAY;
|
||||
|
||||
const ZOOM_OUT_LADDER_MS: number[] = [
|
||||
15 * MS_PER_MIN, // 15m
|
||||
45 * MS_PER_MIN, // 45m
|
||||
2 * MS_PER_HOUR, // 2hr
|
||||
7 * MS_PER_HOUR, // 7hr
|
||||
21 * MS_PER_HOUR, // 21hr
|
||||
1 * MS_PER_DAY, // 1d
|
||||
2 * MS_PER_DAY, // 2d
|
||||
3 * MS_PER_DAY, // 3d
|
||||
1 * MS_PER_WEEK, // 1w
|
||||
2 * MS_PER_WEEK, // 2w
|
||||
30 * MS_PER_DAY, // 1m
|
||||
];
|
||||
|
||||
const LADDER_LAST_INDEX = ZOOM_OUT_LADDER_MS.length - 1;
|
||||
const MAX_DURATION = ZOOM_OUT_LADDER_MS[LADDER_LAST_INDEX];
|
||||
const MIN_LADDER_DURATION_MS = ZOOM_OUT_LADDER_MS[0]; // 15m - below this we use 3x
|
||||
|
||||
export const MAX_ZOOM_OUT_DURATION_MS = MAX_DURATION;
|
||||
|
||||
/** Returns true when zoom out should be disabled (range at or beyond 1 month) */
|
||||
export function isZoomOutDisabled(durationMs: number): boolean {
|
||||
return durationMs >= MAX_ZOOM_OUT_DURATION_MS;
|
||||
}
|
||||
|
||||
/** Preset labels for ladder steps supported by GetMinMax (shows "Last 15 minutes" etc. instead of "Custom") */
|
||||
const PRESET_FOR_DURATION_MS: Record<number, Time | CustomTimeType> = {
|
||||
[15 * MS_PER_MIN]: '15m',
|
||||
[45 * MS_PER_MIN]: '45m',
|
||||
[2 * MS_PER_HOUR]: '2h',
|
||||
[7 * MS_PER_HOUR]: '7h',
|
||||
[21 * MS_PER_HOUR]: '21h',
|
||||
[1 * MS_PER_DAY]: '1d',
|
||||
[2 * MS_PER_DAY]: '2d',
|
||||
[3 * MS_PER_DAY]: '3d',
|
||||
[1 * MS_PER_WEEK]: '1w',
|
||||
[2 * MS_PER_WEEK]: '2w',
|
||||
[30 * MS_PER_DAY]: '1month',
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns the next duration in the zoom-out ladder for the given current duration.
|
||||
* Below 15m: zoom out 3x until we reach 15m, then continue with the ladder.
|
||||
* If at or past 1 month, returns MAX_DURATION (no zoom out - button is disabled).
|
||||
*/
|
||||
export function getNextDurationInLadder(durationMs: number): number {
|
||||
if (durationMs >= MAX_DURATION) {
|
||||
return MAX_DURATION; // No zoom out beyond 1 month
|
||||
}
|
||||
|
||||
// Below 15m: zoom out 3x until we reach 15m
|
||||
if (durationMs < MIN_LADDER_DURATION_MS) {
|
||||
const next = durationMs * 3;
|
||||
return Math.min(next, MIN_LADDER_DURATION_MS);
|
||||
}
|
||||
|
||||
// At or above 15m: use the fixed ladder
|
||||
for (let i = 0; i < ZOOM_OUT_LADDER_MS.length; i++) {
|
||||
if (ZOOM_OUT_LADDER_MS[i] > durationMs) {
|
||||
return ZOOM_OUT_LADDER_MS[i];
|
||||
}
|
||||
}
|
||||
|
||||
return MAX_DURATION;
|
||||
}
|
||||
|
||||
export interface ZoomOutResult {
|
||||
range: [number, number];
|
||||
/** Preset key (e.g. '15m') when range matches a preset - use for display instead of "Custom Date Range" */
|
||||
preset: Time | CustomTimeType | null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the next zoomed-out time range.
|
||||
* Phase 1 (center-anchored): While new end <= now, expand from center.
|
||||
* Phase 2 (end-anchored at now): When new end would exceed now, anchor end at now and move start backward.
|
||||
*
|
||||
* @returns ZoomOutResult with range and preset (or null if no change)
|
||||
*/
|
||||
export function getNextZoomOutRange(
|
||||
startMs: number,
|
||||
endMs: number,
|
||||
): ZoomOutResult | null {
|
||||
const nowMs = Date.now();
|
||||
const durationMs = endMs - startMs;
|
||||
|
||||
if (durationMs <= 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const newDurationMs = getNextDurationInLadder(durationMs);
|
||||
|
||||
// No zoom out when already at max (1 month)
|
||||
if (newDurationMs <= durationMs) {
|
||||
return null;
|
||||
}
|
||||
const centerMs = startMs + durationMs / 2;
|
||||
const computedEndMs = centerMs + newDurationMs / 2;
|
||||
|
||||
let newStartMs: number;
|
||||
let newEndMs: number;
|
||||
|
||||
const isPhase1 = computedEndMs <= nowMs;
|
||||
if (isPhase1) {
|
||||
// Phase 1: center-anchored (historical range not ending at now)
|
||||
newStartMs = centerMs - newDurationMs / 2;
|
||||
newEndMs = computedEndMs;
|
||||
} else {
|
||||
// Phase 2: end-anchored at now
|
||||
newStartMs = nowMs - newDurationMs;
|
||||
newEndMs = nowMs;
|
||||
}
|
||||
|
||||
// Phase 2 only: use preset so GetMinMax produces "last X from now".
|
||||
// Phase 1: preset=null so the center-anchored range is preserved (GetMinMax would discard it).
|
||||
const preset = isPhase1 ? null : PRESET_FOR_DURATION_MS[newDurationMs] ?? null;
|
||||
|
||||
return {
|
||||
range: [Math.round(newStartMs), Math.round(newEndMs)],
|
||||
preset,
|
||||
};
|
||||
}
|
||||
28
frontend/src/utils/metricsTimeStorageUtils.ts
Normal file
28
frontend/src/utils/metricsTimeStorageUtils.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import getLocalStorageKey from 'api/browser/localstorage/get';
|
||||
import setLocalStorageKey from 'api/browser/localstorage/set';
|
||||
import { LOCALSTORAGE } from 'constants/localStorage';
|
||||
|
||||
/**
|
||||
* Updates the stored time duration for a route in localStorage.
|
||||
* Used by both DateTimeSelectionV2 (manual time pick) and useZoomOut (zoom out button).
|
||||
*
|
||||
* @param pathname - The route path (e.g. /infrastructure-monitoring/hosts)
|
||||
* @param value - The time value to store (preset string like '1w' or JSON string for custom range)
|
||||
*/
|
||||
export function persistTimeDurationForRoute(
|
||||
pathname: string,
|
||||
value: string,
|
||||
): void {
|
||||
const preRoutes = getLocalStorageKey(LOCALSTORAGE.METRICS_TIME_IN_DURATION);
|
||||
let preRoutesObject: Record<string, string> = {};
|
||||
try {
|
||||
preRoutesObject = preRoutes ? JSON.parse(preRoutes) : {};
|
||||
} catch {
|
||||
preRoutesObject = {};
|
||||
}
|
||||
const preRoute = { ...preRoutesObject, [pathname]: value };
|
||||
setLocalStorageKey(
|
||||
LOCALSTORAGE.METRICS_TIME_IN_DURATION,
|
||||
JSON.stringify(preRoute),
|
||||
);
|
||||
}
|
||||
@@ -69,6 +69,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
key string // deterministic join of label values
|
||||
}
|
||||
seriesMap := map[sKey]*qbtypes.TimeSeries{}
|
||||
var keyOrder []sKey // preserves ClickHouse row-arrival order
|
||||
|
||||
stepMs := uint64(step.Duration.Milliseconds())
|
||||
|
||||
@@ -219,6 +220,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
if !ok {
|
||||
series = &qbtypes.TimeSeries{Labels: lblObjs}
|
||||
seriesMap[key] = series
|
||||
keyOrder = append(keyOrder, key)
|
||||
}
|
||||
series.Values = append(series.Values, &qbtypes.TimeSeriesValue{
|
||||
Timestamp: ts,
|
||||
@@ -250,8 +252,8 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
Alias: "__result_" + strconv.Itoa(i),
|
||||
}
|
||||
}
|
||||
for k, s := range seriesMap {
|
||||
buckets[k.agg].Series = append(buckets[k.agg].Series, s)
|
||||
for _, k := range keyOrder {
|
||||
buckets[k.agg].Series = append(buckets[k.agg].Series, seriesMap[k])
|
||||
}
|
||||
|
||||
var nonEmpty []*qbtypes.AggregationBucket
|
||||
|
||||
@@ -185,22 +185,6 @@ func postProcessMetricQuery(
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
req *qbtypes.QueryRangeRequest,
|
||||
) *qbtypes.Result {
|
||||
|
||||
config := query.Aggregations[0]
|
||||
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
|
||||
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
|
||||
timeSpaceAggOrderBy := fmt.Sprintf("%s(%s(%s))", config.SpaceAggregation.StringValue(), config.TimeAggregation.StringValue(), config.MetricName)
|
||||
|
||||
for idx := range query.Order {
|
||||
if query.Order[idx].Key.Name == spaceAggOrderBy ||
|
||||
query.Order[idx].Key.Name == timeAggOrderBy ||
|
||||
query.Order[idx].Key.Name == timeSpaceAggOrderBy {
|
||||
query.Order[idx].Key.Name = qbtypes.DefaultOrderByKey
|
||||
}
|
||||
}
|
||||
|
||||
result = q.applySeriesLimit(result, query.Limit, query.Order)
|
||||
|
||||
if len(query.Functions) > 0 {
|
||||
step := query.StepInterval.Duration.Milliseconds()
|
||||
functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step)
|
||||
|
||||
@@ -132,6 +132,14 @@ func GroupByKeys(keys []qbtypes.GroupByKey) []string {
|
||||
return k
|
||||
}
|
||||
|
||||
func OrderByKeys(keys []qbtypes.OrderBy) []string {
|
||||
k := []string{}
|
||||
for _, key := range keys {
|
||||
k = append(k, "`"+key.Key.Name+"`")
|
||||
}
|
||||
return k
|
||||
}
|
||||
|
||||
func FormatValueForContains(value any) string {
|
||||
if value == nil {
|
||||
return ""
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
Args: []any{"signoz_calls_total", uint64(1747785600000), uint64(1747983420000), "cartservice", "cumulative", 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -84,7 +84,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta"},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -117,7 +117,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta", 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -150,7 +150,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
|
||||
Args: []any{"system.memory.usage", uint64(1747872000000), uint64(1747983420000), "big-data-node-1", "unspecified", 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
@@ -546,6 +547,16 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
) (*qbtypes.Statement, error) {
|
||||
metricType := query.Aggregations[0].Type
|
||||
spaceAgg := query.Aggregations[0].SpaceAggregation
|
||||
finalCTE := "__spatial_aggregation_cte"
|
||||
if metricType == metrictypes.HistogramType {
|
||||
histogramCTE, histogramCTEArgs, err := b.buildHistogramCTE(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cteFragments = append(cteFragments, histogramCTE)
|
||||
cteArgs = append(cteArgs, histogramCTEArgs)
|
||||
finalCTE = "__histogram_cte"
|
||||
}
|
||||
|
||||
combined := querybuilder.CombineCTEs(cteFragments)
|
||||
|
||||
@@ -555,60 +566,97 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("*")
|
||||
sb.From(finalCTE)
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Where(rewrittenExpr)
|
||||
}
|
||||
|
||||
if metricType == metrictypes.HistogramType && spaceAgg.IsPercentile() {
|
||||
quantile := query.Aggregations[0].SpaceAggregation.Percentile()
|
||||
sb.Select("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
groupByKeys := querybuilder.GroupByKeys(query.GroupBy)
|
||||
hasOrder := len(query.Order) > 0
|
||||
hasLimit := query.Limit > 0
|
||||
hasGroupBy := len(groupByKeys) > 0
|
||||
|
||||
if !hasGroupBy {
|
||||
// do nothing, limits and orders don't mean anything
|
||||
} else if hasOrder && hasLimit {
|
||||
labelSelectorSubQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
labelSelectorSubQueryBuilder.Select(groupByKeys...)
|
||||
labelSelectorSubQueryBuilder.From(finalCTE)
|
||||
labelSelectorOrderClauses := []string{}
|
||||
orderedKeys := map[string]struct{}{} // this will be used to add the remaining keys as tie breakers in the end
|
||||
for _, o := range query.Order {
|
||||
key := o.Key.Name
|
||||
var clause string
|
||||
if strings.Contains(key, query.Aggregations[0].MetricName) {
|
||||
clause = fmt.Sprintf("avg(value) %s", o.Direction.StringValue())
|
||||
} else {
|
||||
clause = fmt.Sprintf("`%s` %s", key, o.Direction.StringValue())
|
||||
orderedKeys[fmt.Sprintf("`%s`", key)] = struct{}{}
|
||||
}
|
||||
labelSelectorOrderClauses = append(labelSelectorOrderClauses, clause)
|
||||
}
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
|
||||
quantile,
|
||||
))
|
||||
sb.From("__spatial_aggregation_cte")
|
||||
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
sb.GroupBy("ts")
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Having(rewrittenExpr)
|
||||
for _, gk := range groupByKeys { // keys that haven't been added via order by keys will be added at the end as tie breakers
|
||||
if _, ok := orderedKeys[gk]; !ok {
|
||||
labelSelectorOrderClauses = append(labelSelectorOrderClauses, fmt.Sprintf("%s ASC", gk))
|
||||
}
|
||||
}
|
||||
} else if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam != nil {
|
||||
sb.Select("ts")
|
||||
labelSelectorSubQueryBuilder.GroupBy(groupByKeys...)
|
||||
labelSelectorSubQueryBuilder.OrderBy(labelSelectorOrderClauses...)
|
||||
labelSelectorSubQuery, _ := labelSelectorSubQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
labelSelectorSubQuery = fmt.Sprintf("%s LIMIT %d", labelSelectorSubQuery, query.Limit)
|
||||
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
sb.Where(fmt.Sprintf("(%s) IN (%s)", strings.Join(groupByKeys, ", "), labelSelectorSubQuery))
|
||||
for _, o := range query.Order {
|
||||
key := o.Key.Name
|
||||
var clause string
|
||||
if strings.Contains(key, query.Aggregations[0].MetricName) {
|
||||
clause = fmt.Sprintf("avg(value) OVER (PARTITION BY %s) %s", strings.Join(groupByKeys, ", "), o.Direction.StringValue())
|
||||
} else {
|
||||
clause = fmt.Sprintf("`%s` %s", key, o.Direction.StringValue())
|
||||
}
|
||||
sb.OrderBy(clause)
|
||||
}
|
||||
|
||||
aggQuery, err := AggregationQueryForHistogramCountWithParams(query.Aggregations[0].ComparisonSpaceAggregationParam)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if hasOrder {
|
||||
// order by without limit: apply order by clauses directly
|
||||
for _, o := range query.Order {
|
||||
key := o.Key.Name
|
||||
if strings.Contains(key, query.Aggregations[0].MetricName) {
|
||||
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) %s", strings.Join(groupByKeys, ", "), o.Direction.StringValue()))
|
||||
continue
|
||||
}
|
||||
sb.OrderBy(fmt.Sprintf("`%s` %s", o.Key.Name, o.Direction.StringValue()))
|
||||
}
|
||||
sb.SelectMore(aggQuery)
|
||||
} else if hasLimit {
|
||||
labelSelectorSubQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
labelSelectorSubQueryBuilder.Select(groupByKeys...)
|
||||
labelSelectorSubQueryBuilder.From(finalCTE)
|
||||
labelSelectorSubQueryBuilder.GroupBy(groupByKeys...)
|
||||
labelSelectorSubQueryBuilder.OrderBy("avg(value) DESC")
|
||||
labelSelectorSubQuery, _ := labelSelectorSubQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
labelSelectorSubQuery = fmt.Sprintf("%s LIMIT %d", labelSelectorSubQuery, query.Limit)
|
||||
|
||||
sb.From("__spatial_aggregation_cte")
|
||||
|
||||
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
sb.GroupBy("ts")
|
||||
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Having(rewrittenExpr)
|
||||
}
|
||||
sb.Where(fmt.Sprintf("(%s) IN (%s)", strings.Join(groupByKeys, ", "), labelSelectorSubQuery))
|
||||
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) DESC", strings.Join(groupByKeys, ", ")))
|
||||
} else {
|
||||
// for count aggregation on histograms with no params, the exact result of spatial aggregation can be sent forward
|
||||
sb.Select("*")
|
||||
sb.From("__spatial_aggregation_cte")
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Where(rewrittenExpr)
|
||||
// grouping without order by or limit: sort by avg(value) DESC with labels as tiebreakers
|
||||
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) DESC", strings.Join(groupByKeys, ", ")))
|
||||
}
|
||||
|
||||
// add any group-by keys not already in the order-by as tiebreakers
|
||||
orderKeySet := make(map[string]struct{})
|
||||
for _, o := range query.Order {
|
||||
orderKeySet[fmt.Sprintf("`%s`", o.Key.Name)] = struct{}{}
|
||||
}
|
||||
for _, g := range groupByKeys {
|
||||
if _, exists := orderKeySet[g]; !exists {
|
||||
sb.OrderBy(g)
|
||||
}
|
||||
}
|
||||
sb.OrderBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
sb.OrderBy("ts")
|
||||
|
||||
sb.OrderBy("ts ASC")
|
||||
if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam == nil {
|
||||
sb.OrderBy("toFloat64(le)")
|
||||
}
|
||||
@@ -616,3 +664,45 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return &qbtypes.Statement{Query: combined + q, Args: append(args, a...)}, nil
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildHistogramCTE(
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
) (string, []any, error) {
|
||||
spaceAgg := query.Aggregations[0].SpaceAggregation
|
||||
histogramCTEQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
if spaceAgg.IsPercentile() {
|
||||
histogramCTEQueryBuilder.Select("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
quantile := spaceAgg.Percentile()
|
||||
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf(
|
||||
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
|
||||
quantile,
|
||||
))
|
||||
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
|
||||
histogramCTEQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
histogramCTEQueryBuilder.GroupBy("ts")
|
||||
} else if spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam != nil {
|
||||
histogramCTEQueryBuilder.Select("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
aggQuery, err := AggregationQueryForHistogramCountWithParams(query.Aggregations[0].ComparisonSpaceAggregationParam)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
histogramCTEQueryBuilder.SelectMore(aggQuery)
|
||||
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
|
||||
histogramCTEQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
histogramCTEQueryBuilder.GroupBy("ts")
|
||||
} else {
|
||||
// for count aggregation on histograms with no params, the exact result of spatial aggregation can be sent forward
|
||||
histogramCTEQueryBuilder.Select("*")
|
||||
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
|
||||
}
|
||||
histogramQueryCTE, histogramQueryCTEArgs := histogramCTEQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
histogramCTE := fmt.Sprintf("__histogram_cte AS (%s)", histogramQueryCTE)
|
||||
|
||||
return histogramCTE, histogramQueryCTEArgs, nil
|
||||
}
|
||||
|
||||
@@ -15,16 +15,17 @@ import (
|
||||
)
|
||||
|
||||
func TestStatementBuilder(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
|
||||
expected qbtypes.Statement
|
||||
expectedErr error
|
||||
}{
|
||||
type baseQuery struct {
|
||||
name string
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
|
||||
orderKey string
|
||||
args []any
|
||||
cte string
|
||||
}
|
||||
|
||||
bases := []baseQuery{
|
||||
{
|
||||
name: "test_cumulative_rate_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "cumulative_rate_sum",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -40,24 +41,16 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "service.name",
|
||||
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
|
||||
},
|
||||
{
|
||||
name: "test_cumulative_rate_sum_with_mat_column",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "cumulative_rate_sum_with_mat_column",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -73,24 +66,16 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "materialized.key.name REGEXP 'cartservice' OR service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "service.name",
|
||||
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
|
||||
},
|
||||
{
|
||||
name: "test_delta_rate_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "delta_rate_sum",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -106,24 +91,16 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "service.name",
|
||||
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
|
||||
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`)",
|
||||
},
|
||||
{
|
||||
name: "test_histogram_percentile1",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "histogram_percentile1",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -139,24 +116,38 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
|
||||
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "service.name",
|
||||
args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
|
||||
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`), __histogram_cte AS (SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts)",
|
||||
},
|
||||
{
|
||||
name: "test_gauge_avg_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "histogram_percentile2",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "http_server_duration_bucket",
|
||||
Type: metrictypes.HistogramType,
|
||||
Temporality: metrictypes.Cumulative,
|
||||
TimeAggregation: metrictypes.TimeAggregationRate,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
orderKey: "service.name",
|
||||
args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`), __histogram_cte AS (SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts)",
|
||||
},
|
||||
{
|
||||
name: "gauge_avg_sum",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -172,53 +163,83 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "host.name = 'big-data-node-1'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "host.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "host.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
|
||||
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "test_histogram_percentile2",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "http_server_duration_bucket",
|
||||
Type: metrictypes.HistogramType,
|
||||
Temporality: metrictypes.Cumulative,
|
||||
TimeAggregation: metrictypes.TimeAggregationRate,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
|
||||
},
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
|
||||
Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "host.name",
|
||||
args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
|
||||
cte: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`)",
|
||||
},
|
||||
}
|
||||
|
||||
type variant struct {
|
||||
name string
|
||||
limit int
|
||||
hasOrder bool
|
||||
}
|
||||
|
||||
variants := []variant{
|
||||
{"with_limits", 10, false},
|
||||
{"without_limits", 0, false},
|
||||
{"with_order_by", 0, true},
|
||||
{"with_order_by_and_limits", 10, true},
|
||||
}
|
||||
|
||||
sumMetricsFinalSelects := map[string]string{
|
||||
"with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
"without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
"with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
|
||||
"with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
|
||||
}
|
||||
|
||||
histogramMetricsFinalSelects := map[string]string{
|
||||
"with_limits": " SELECT * FROM __histogram_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __histogram_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
"without_limits": " SELECT * FROM __histogram_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
"with_order_by": " SELECT * FROM __histogram_cte ORDER BY `service.name` asc, ts ASC",
|
||||
"with_order_by_and_limits": " SELECT * FROM __histogram_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __histogram_cte GROUP BY `service.name` ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
|
||||
}
|
||||
|
||||
// expectedFinalSelects maps "base/variant" to the final SELECT portion after the CTE.
|
||||
// The full expected query is: base.cte + expectedFinalSelects[name]
|
||||
expectedFinalSelects := map[string]string{
|
||||
// cumulative_rate_sum
|
||||
"cumulative_rate_sum/with_limits": sumMetricsFinalSelects["with_limits"],
|
||||
"cumulative_rate_sum/without_limits": sumMetricsFinalSelects["without_limits"],
|
||||
"cumulative_rate_sum/with_order_by": sumMetricsFinalSelects["with_order_by"],
|
||||
"cumulative_rate_sum/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// cumulative_rate_sum_with_mat_column
|
||||
"cumulative_rate_sum_with_mat_column/with_limits": sumMetricsFinalSelects["with_limits"],
|
||||
"cumulative_rate_sum_with_mat_column/without_limits": sumMetricsFinalSelects["without_limits"],
|
||||
"cumulative_rate_sum_with_mat_column/with_order_by": sumMetricsFinalSelects["with_order_by"],
|
||||
"cumulative_rate_sum_with_mat_column/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// delta_rate_sum
|
||||
"delta_rate_sum/with_limits": sumMetricsFinalSelects["with_limits"],
|
||||
"delta_rate_sum/without_limits": sumMetricsFinalSelects["without_limits"],
|
||||
"delta_rate_sum/with_order_by": sumMetricsFinalSelects["with_order_by"],
|
||||
"delta_rate_sum/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// histogram_percentile1
|
||||
"histogram_percentile1/with_limits": histogramMetricsFinalSelects["with_limits"],
|
||||
"histogram_percentile1/without_limits": histogramMetricsFinalSelects["without_limits"],
|
||||
"histogram_percentile1/with_order_by": histogramMetricsFinalSelects["with_order_by"],
|
||||
"histogram_percentile1/with_order_by_and_limits": histogramMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// histogram_percentile2
|
||||
"histogram_percentile2/with_limits": histogramMetricsFinalSelects["with_limits"],
|
||||
"histogram_percentile2/without_limits": histogramMetricsFinalSelects["without_limits"],
|
||||
"histogram_percentile2/with_order_by": histogramMetricsFinalSelects["with_order_by"],
|
||||
"histogram_percentile2/with_order_by_and_limits": histogramMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// gauge_avg_sum
|
||||
"gauge_avg_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
|
||||
"gauge_avg_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
|
||||
"gauge_avg_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name` asc, ts ASC",
|
||||
"gauge_avg_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY `host.name` asc LIMIT 10) ORDER BY `host.name` asc, ts ASC",
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
@@ -227,15 +248,13 @@ func TestStatementBuilder(t *testing.T) {
|
||||
t.Fatalf("failed to load field keys: %v", err)
|
||||
}
|
||||
mockMetadataStore.KeysMap = keys
|
||||
// NOTE: LoadFieldKeysFromJSON doesn't set Materialized field
|
||||
// for keys, so we have to set it manually here for testing
|
||||
if _, ok := mockMetadataStore.KeysMap["materialized.key.name"]; ok {
|
||||
if len(mockMetadataStore.KeysMap["materialized.key.name"]) > 0 {
|
||||
mockMetadataStore.KeysMap["materialized.key.name"][0].Materialized = true
|
||||
}
|
||||
}
|
||||
|
||||
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
|
||||
fl, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create flagger: %v", err)
|
||||
}
|
||||
@@ -245,23 +264,30 @@ func TestStatementBuilder(t *testing.T) {
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
flagger,
|
||||
fl,
|
||||
)
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
for _, b := range bases {
|
||||
for _, v := range variants {
|
||||
name := b.name + "/" + v.name
|
||||
t.Run(name, func(t *testing.T) {
|
||||
q := b.query
|
||||
q.Limit = v.limit
|
||||
if v.hasOrder {
|
||||
q.Order = []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: b.orderKey}},
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
result, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, qbtypes.RequestTypeTimeSeries, q, nil)
|
||||
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), c.expectedErr.Error())
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected.Query, q.Query)
|
||||
require.Equal(t, c.expected.Args, q.Args)
|
||||
require.Equal(t, c.expected.Warnings, q.Warnings)
|
||||
}
|
||||
})
|
||||
require.Equal(t, b.cte+expectedFinalSelects[name], result.Query)
|
||||
require.Equal(t, b.args, result.Args)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,6 +75,9 @@ def clickhouse(
|
||||
</cluster>
|
||||
</remote_servers>
|
||||
|
||||
<user_defined_executable_functions_config>*function.xml</user_defined_executable_functions_config>
|
||||
<user_scripts_path>/var/lib/clickhouse/user_scripts/</user_scripts_path>
|
||||
|
||||
<distributed_ddl>
|
||||
<path>/clickhouse/task_queue/ddl</path>
|
||||
<profile>default</profile>
|
||||
@@ -117,17 +120,73 @@ def clickhouse(
|
||||
</clickhouse>
|
||||
"""
|
||||
|
||||
custom_function_config = """
|
||||
<functions>
|
||||
<function>
|
||||
<type>executable</type>
|
||||
<name>histogramQuantile</name>
|
||||
<return_type>Float64</return_type>
|
||||
<argument>
|
||||
<type>Array(Float64)</type>
|
||||
<name>buckets</name>
|
||||
</argument>
|
||||
<argument>
|
||||
<type>Array(Float64)</type>
|
||||
<name>counts</name>
|
||||
</argument>
|
||||
<argument>
|
||||
<type>Float64</type>
|
||||
<name>quantile</name>
|
||||
</argument>
|
||||
<format>CSV</format>
|
||||
<command>./histogramQuantile</command>
|
||||
</function>
|
||||
</functions>
|
||||
"""
|
||||
|
||||
tmp_dir = tmpfs("clickhouse")
|
||||
cluster_config_file_path = os.path.join(tmp_dir, "cluster.xml")
|
||||
with open(cluster_config_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(cluster_config)
|
||||
|
||||
custom_function_file_path = os.path.join(tmp_dir, "custom-function.xml")
|
||||
with open(custom_function_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(custom_function_config)
|
||||
|
||||
container.with_volume_mapping(
|
||||
cluster_config_file_path, "/etc/clickhouse-server/config.d/cluster.xml"
|
||||
)
|
||||
container.with_volume_mapping(
|
||||
custom_function_file_path,
|
||||
"/etc/clickhouse-server/custom-function.xml",
|
||||
)
|
||||
container.with_network(network)
|
||||
container.start()
|
||||
|
||||
# Download and install the histogramQuantile binary
|
||||
wrapped = container.get_wrapped_container()
|
||||
exit_code, output = wrapped.exec_run(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
(
|
||||
'version="v0.0.1" && '
|
||||
'node_os=$(uname -s | tr "[:upper:]" "[:lower:]") && '
|
||||
'node_arch=$(uname -m | sed s/aarch64/arm64/ | sed s/x86_64/amd64/) && '
|
||||
"cd /tmp && "
|
||||
'wget -O histogram-quantile.tar.gz "https://github.com/SigNoz/signoz/releases/download/histogram-quantile%2F${version}/histogram-quantile_${node_os}_${node_arch}.tar.gz" && '
|
||||
"tar -xzf histogram-quantile.tar.gz && "
|
||||
"mkdir -p /var/lib/clickhouse/user_scripts && "
|
||||
"mv histogram-quantile /var/lib/clickhouse/user_scripts/histogramQuantile && "
|
||||
"chmod +x /var/lib/clickhouse/user_scripts/histogramQuantile"
|
||||
),
|
||||
],
|
||||
)
|
||||
if exit_code != 0:
|
||||
raise RuntimeError(
|
||||
f"Failed to install histogramQuantile binary: {output.decode()}"
|
||||
)
|
||||
|
||||
connection = clickhouse_connect.get_client(
|
||||
user=container.username,
|
||||
password=container.password,
|
||||
|
||||
@@ -58,6 +58,8 @@ def build_builder_query(
|
||||
step_interval: int = DEFAULT_STEP_INTERVAL,
|
||||
group_by: Optional[List[str]] = None,
|
||||
filter_expression: Optional[str] = None,
|
||||
order_by: Optional[List[Dict]] = None,
|
||||
limit: Optional[int] = None,
|
||||
functions: Optional[List[Dict]] = None,
|
||||
disabled: bool = False,
|
||||
) -> Dict:
|
||||
@@ -93,6 +95,12 @@ def build_builder_query(
|
||||
if filter_expression:
|
||||
spec["filter"] = {"expression": filter_expression}
|
||||
|
||||
if order_by:
|
||||
spec["order"] = order_by
|
||||
|
||||
if limit is not None:
|
||||
spec["limit"] = limit
|
||||
|
||||
if functions:
|
||||
spec["functions"] = functions
|
||||
|
||||
|
||||
@@ -2,16 +2,20 @@
|
||||
Look at the cumulative_counters_1h.jsonl file for the relevant data
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Callable, List
|
||||
from typing import Any, Callable, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
@@ -71,16 +75,200 @@ def test_rate_with_steady_values_and_reset(
|
||||
assert v["value"] >= 0, f"Rate should not be negative: {v['value']}"
|
||||
|
||||
|
||||
def _assert_endpoint_rate_values(endpoint_values: dict) -> None:
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
# rate = 10/60 = 0.167
|
||||
if "/health" in endpoint_values:
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) >= 58
|
||||
), f"Expected >= 58 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
|
||||
assert (
|
||||
count_steady_health >= 57
|
||||
), f"Expected >= 57 steady rate values (0.167) for /health, got {count_steady_health}"
|
||||
# all /health rates should be 0.167 except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
|
||||
if "/products" in endpoint_values:
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) >= 49
|
||||
), f"Expected >= 49 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
|
||||
# most values should be 0.333, some boundary values differ due to 10-min gap
|
||||
assert (
|
||||
count_steady_products >= 46
|
||||
), f"Expected >= 46 steady rate values (0.333) for /products, got {count_steady_products}"
|
||||
# check that non-0.333 values are due to gap averaging (should be lower)
|
||||
gap_boundary_values = [v["value"] for v in products_values if v["value"] != 0.333]
|
||||
for val in gap_boundary_values:
|
||||
assert (
|
||||
0 < val < 0.333
|
||||
), f"Gap boundary values should be between 0 and 0.333, got {val}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
|
||||
if "/checkout" in endpoint_values:
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) >= 59
|
||||
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
|
||||
assert (
|
||||
count_steady_checkout >= 53
|
||||
), f"Expected >= 53 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
|
||||
assert (
|
||||
count_spike_checkout >= 4
|
||||
), f"Expected >= 4 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
|
||||
]
|
||||
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# rate = 5/60 = 0.0833
|
||||
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
|
||||
if "/orders" in endpoint_values:
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) >= 58
|
||||
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
|
||||
assert (
|
||||
count_steady_orders >= 55
|
||||
), f"Expected >= 55 steady rate values (0.0833) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
|
||||
assert (
|
||||
len(non_standard_orders) >= 2
|
||||
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
|
||||
assert (
|
||||
len(high_rate_orders) >= 1
|
||||
), f"Expected at least one high rate value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
|
||||
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
|
||||
if "/users" in endpoint_values:
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) >= 54
|
||||
), f"Expected >= 54 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users >= 40
|
||||
), f"Expected >= 40 zero rate values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
|
||||
assert (
|
||||
count_increment_rate >= 8
|
||||
), f"Expected >= 8 increment rate values (0.0167) for /users, got {count_increment_rate}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"metric_suffix,order_by,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None, # this is equivalent to sum(metric_name)
|
||||
None,
|
||||
5,
|
||||
["/products", "/health", "/checkout", "/orders", "/users"],
|
||||
),
|
||||
(
|
||||
"only_limit",
|
||||
None, # this is equivalent to sum(metric_name)
|
||||
3,
|
||||
3,
|
||||
["/products", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
5,
|
||||
["/checkout", "/health", "/orders", "/products", "/users"],
|
||||
),
|
||||
(
|
||||
"asc_lim3",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
3,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/users", "/products", "/orders", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim3",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/users", "/products", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
[build_order_by("sum(test_rate_groupby_asc_metric_name)", "asc")],
|
||||
None,
|
||||
5,
|
||||
["/users", "/orders", "/checkout", "/health", "/products"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim3",
|
||||
[build_order_by("sum(test_rate_groupby_asc_metric_name_lim3)", "asc")],
|
||||
3,
|
||||
3,
|
||||
["/users", "/orders", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
[build_order_by("sum(test_rate_groupby_desc_metric_name)", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/products", "/health", "/checkout", "/orders", "/users"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim3",
|
||||
[build_order_by("sum(test_rate_groupby_desc_metric_name_lim3)", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/products", "/health", "/checkout"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_rate_group_by_endpoint(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
metric_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = "test_rate_groupby"
|
||||
metric_name = f"test_rate_groupby_{metric_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
CUMULATIVE_COUNTERS_FILE,
|
||||
@@ -97,6 +285,8 @@ def test_rate_group_by_endpoint(
|
||||
"sum",
|
||||
temporality="cumulative",
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
@@ -105,10 +295,23 @@ def test_rate_group_by_endpoint(
|
||||
data = response.json()
|
||||
all_series = get_all_series(data, "A")
|
||||
|
||||
# Should have 5 different endpoints
|
||||
assert (
|
||||
len(all_series) == 5
|
||||
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
|
||||
len(all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(all_series)}"
|
||||
|
||||
endpoint_labels = [
|
||||
series.get("labels", [{}])[0].get("value", "unknown")
|
||||
for series in all_series
|
||||
]
|
||||
|
||||
if isinstance(expected_endpoints, set):
|
||||
assert (
|
||||
set(endpoint_labels) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
|
||||
else:
|
||||
assert endpoint_labels == expected_endpoints, (
|
||||
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
|
||||
)
|
||||
|
||||
# endpoint -> values
|
||||
endpoint_values = {}
|
||||
@@ -117,11 +320,6 @@ def test_rate_group_by_endpoint(
|
||||
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
|
||||
endpoint_values[endpoint] = values
|
||||
|
||||
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
|
||||
assert (
|
||||
set(endpoint_values.keys()) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
|
||||
|
||||
# at no point rate should be negative
|
||||
for endpoint, values in endpoint_values.items():
|
||||
for v in values:
|
||||
@@ -129,103 +327,4 @@ def test_rate_group_by_endpoint(
|
||||
v["value"] >= 0
|
||||
), f"Rate for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
# rate = 10/60 = 0.167
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) >= 58
|
||||
), f"Expected >= 58 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
|
||||
assert (
|
||||
count_steady_health >= 57
|
||||
), f"Expected >= 57 steady rate values (0.167) for /health, got {count_steady_health}"
|
||||
# all /health rates should be 0.167 except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) >= 49
|
||||
), f"Expected >= 49 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
|
||||
|
||||
# most values should be 0.333, some boundary values differ due to 10-min gap
|
||||
assert (
|
||||
count_steady_products >= 46
|
||||
), f"Expected >= 46 steady rate values (0.333) for /products, got {count_steady_products}"
|
||||
|
||||
# check that non-0.333 values are due to gap averaging (should be lower)
|
||||
gap_boundary_values = [v["value"] for v in products_values if v["value"] != 0.333]
|
||||
for val in gap_boundary_values:
|
||||
assert (
|
||||
0 < val < 0.333
|
||||
), f"Gap boundary values should be between 0 and 0.333, got {val}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) >= 59
|
||||
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
|
||||
assert (
|
||||
count_steady_checkout >= 53
|
||||
), f"Expected >= 53 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
|
||||
assert (
|
||||
count_spike_checkout >= 4
|
||||
), f"Expected >= 4 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
|
||||
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
|
||||
]
|
||||
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
|
||||
# consecutiveness
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# rate = 5/60 = 0.0833
|
||||
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) >= 58
|
||||
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
|
||||
assert (
|
||||
count_steady_orders >= 55
|
||||
), f"Expected >= 55 steady rate values (0.0833) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
|
||||
assert (
|
||||
len(non_standard_orders) >= 2
|
||||
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
|
||||
assert (
|
||||
len(high_rate_orders) >= 1
|
||||
), f"Expected at least one high rate value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
|
||||
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) >= 54
|
||||
), f"Expected >= 54 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users >= 40
|
||||
), f"Expected >= 40 zero rate values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
|
||||
assert (
|
||||
count_increment_rate >= 8
|
||||
), f"Expected >= 8 increment rate values (0.0167) for /users, got {count_increment_rate}"
|
||||
_assert_endpoint_rate_values(endpoint_values)
|
||||
|
||||
@@ -5,7 +5,7 @@ Look at the multi_temporality_counters_1h.jsonl file for the relevant data
|
||||
import random
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
from typing import Callable, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -14,6 +14,7 @@ from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
@@ -91,6 +92,198 @@ def test_with_steady_values_and_reset(
|
||||
), f"{time_aggregation} should not be negative: {v['value']}"
|
||||
|
||||
|
||||
def _assert_endpoint_group_values( # pylint: disable=too-many-arguments
|
||||
endpoint_values: dict,
|
||||
stable_health_value: float,
|
||||
stable_products_value: float,
|
||||
stable_checkout_value: float,
|
||||
spike_checkout_value: float,
|
||||
stable_orders_value: float,
|
||||
spike_users_value: float,
|
||||
time_aggregation: str,
|
||||
) -> None:
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
if "/health" in endpoint_values:
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) >= 58
|
||||
), f"Expected >= 58 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(
|
||||
1 for v in health_values if v["value"] == stable_health_value
|
||||
)
|
||||
assert (
|
||||
count_steady_health >= 57
|
||||
), f"Expected >= 57 steady rate values ({stable_health_value}) for /health, got {count_steady_health}"
|
||||
# all /health rates should be stable except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert (
|
||||
v["value"] == stable_health_value
|
||||
), f"Expected /health rate {stable_health_value}, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
if "/products" in endpoint_values:
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) >= 49
|
||||
), f"Expected >= 49 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(
|
||||
1 for v in products_values if v["value"] == stable_products_value
|
||||
)
|
||||
# most values should be stable, some boundary values differ due to 10-min gap
|
||||
assert (
|
||||
count_steady_products >= 46
|
||||
), f"Expected >= 46 steady rate values ({stable_products_value}) for /products, got {count_steady_products}"
|
||||
# check that non-stable values are due to gap averaging (should be lower)
|
||||
gap_boundary_values = [
|
||||
v["value"] for v in products_values if v["value"] != stable_products_value
|
||||
]
|
||||
for val in gap_boundary_values:
|
||||
assert (
|
||||
0 < val < stable_products_value
|
||||
), f"Gap boundary values should be between 0 and {stable_products_value}, got {val}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
if "/checkout" in endpoint_values:
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) >= 59
|
||||
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(
|
||||
1 for v in checkout_values if v["value"] == stable_checkout_value
|
||||
)
|
||||
assert (
|
||||
count_steady_checkout >= 53
|
||||
), f"Expected >= 53 steady {time_aggregation} values ({stable_checkout_value}) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(
|
||||
1 for v in checkout_values if v["value"] == spike_checkout_value
|
||||
)
|
||||
assert (
|
||||
count_spike_checkout >= 4
|
||||
), f"Expected >= 4 spike {time_aggregation} values ({spike_checkout_value}) for /checkout, got {count_spike_checkout}"
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate(checkout_values) if v["value"] == spike_checkout_value
|
||||
]
|
||||
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# reset at t31 causes: rate/increase at t30 includes gap (lower), t31 has high rate after reset
|
||||
if "/orders" in endpoint_values:
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) >= 58
|
||||
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(
|
||||
1 for v in orders_values if v["value"] == stable_orders_value
|
||||
)
|
||||
assert (
|
||||
count_steady_orders >= 55
|
||||
), f"Expected >= 55 steady {time_aggregation} values ({stable_orders_value}) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [
|
||||
v["value"] for v in orders_values if v["value"] != stable_orders_value
|
||||
]
|
||||
assert (
|
||||
len(non_standard_orders) >= 2
|
||||
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > stable_orders_value]
|
||||
assert (
|
||||
len(high_rate_orders) >= 1
|
||||
), f"Expected at least one high {time_aggregation} value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
|
||||
if "/users" in endpoint_values:
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) >= 54
|
||||
), f"Expected >= 54 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users >= 40
|
||||
), f"Expected >= 40 zero {time_aggregation} values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be stable increment rate
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == spike_users_value)
|
||||
assert (
|
||||
count_increment_rate >= 8
|
||||
), f"Expected >= 8 increment {time_aggregation} values ({spike_users_value}) for /users, got {count_increment_rate}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by_spec,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
5,
|
||||
["/products", "/health", "/checkout", "/orders", "/users"],
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
("endpoint", "asc"),
|
||||
None,
|
||||
5,
|
||||
["/checkout", "/health", "/orders", "/products", "/users"],
|
||||
),
|
||||
(
|
||||
"asc_lim3",
|
||||
("endpoint", "asc"),
|
||||
3,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
("endpoint", "desc"),
|
||||
None,
|
||||
5,
|
||||
["/users", "/products", "/orders", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim3",
|
||||
("endpoint", "desc"),
|
||||
3,
|
||||
3,
|
||||
["/users", "/products", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
("sum_metric", "asc"),
|
||||
None,
|
||||
5,
|
||||
["/users", "/orders", "/checkout", "/health", "/products"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim3",
|
||||
("sum_metric", "asc"),
|
||||
3,
|
||||
3,
|
||||
["/users", "/orders", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
("sum_metric", "desc"),
|
||||
None,
|
||||
5,
|
||||
["/products", "/health", "/checkout", "/orders", "/users"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim3",
|
||||
("sum_metric", "desc"),
|
||||
3,
|
||||
3,
|
||||
["/products", "/health", "/checkout"],
|
||||
),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"time_aggregation, stable_health_value, stable_products_value, stable_checkout_value, spike_checkout_value, stable_orders_value, spike_users_value",
|
||||
[
|
||||
@@ -110,11 +303,24 @@ def test_group_by_endpoint(
|
||||
spike_checkout_value: float,
|
||||
stable_orders_value: float,
|
||||
spike_users_value: float,
|
||||
order_suffix: str,
|
||||
order_by_spec: Optional[tuple],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_{time_aggregation}_groupby"
|
||||
metric_name = f"test_{time_aggregation}_groupby_{order_suffix}"
|
||||
|
||||
# Build order_by at runtime so metric name reflects actual time_aggregation
|
||||
order_by = None
|
||||
if order_by_spec is not None:
|
||||
key, direction = order_by_spec
|
||||
if key == "sum_metric":
|
||||
key = f"sum({metric_name})"
|
||||
order_by = [build_order_by(key, direction)]
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
MULTI_TEMPORALITY_FILE,
|
||||
@@ -130,6 +336,8 @@ def test_group_by_endpoint(
|
||||
time_aggregation,
|
||||
"sum",
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
@@ -137,10 +345,23 @@ def test_group_by_endpoint(
|
||||
|
||||
data = response.json()
|
||||
all_series = get_all_series(data, "A")
|
||||
# Should have 5 different endpoints
|
||||
assert (
|
||||
len(all_series) == 5
|
||||
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
|
||||
len(all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(all_series)}"
|
||||
|
||||
endpoint_labels = [
|
||||
series.get("labels", [{}])[0].get("value", "unknown")
|
||||
for series in all_series
|
||||
]
|
||||
|
||||
if isinstance(expected_endpoints, set):
|
||||
assert (
|
||||
set(endpoint_labels) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
|
||||
else:
|
||||
assert endpoint_labels == expected_endpoints, (
|
||||
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
|
||||
)
|
||||
|
||||
# endpoint -> values
|
||||
endpoint_values = {}
|
||||
@@ -149,11 +370,6 @@ def test_group_by_endpoint(
|
||||
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
|
||||
endpoint_values[endpoint] = values
|
||||
|
||||
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
|
||||
assert (
|
||||
set(endpoint_values.keys()) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
|
||||
|
||||
# at no point rate should be negative
|
||||
for endpoint, values in endpoint_values.items():
|
||||
for v in values:
|
||||
@@ -161,117 +377,16 @@ def test_group_by_endpoint(
|
||||
v["value"] >= 0
|
||||
), f"Rate for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) >= 58
|
||||
), f"Expected >= 58 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(
|
||||
1 for v in health_values if v["value"] == stable_health_value
|
||||
_assert_endpoint_group_values(
|
||||
endpoint_values,
|
||||
stable_health_value,
|
||||
stable_products_value,
|
||||
stable_checkout_value,
|
||||
spike_checkout_value,
|
||||
stable_orders_value,
|
||||
spike_users_value,
|
||||
time_aggregation,
|
||||
)
|
||||
assert (
|
||||
count_steady_health >= 57
|
||||
), f"Expected >= 57 steady rate values ({stable_health_value}) for /health, got {count_steady_health}"
|
||||
# all /health rates should be state except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert (
|
||||
v["value"] == stable_health_value
|
||||
), f"Expected /health rate {stable_health_value}, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) >= 49
|
||||
), f"Expected >= 49 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(
|
||||
1 for v in products_values if v["value"] == stable_products_value
|
||||
)
|
||||
|
||||
# most values should be stable, some boundary values differ due to 10-min gap
|
||||
assert (
|
||||
count_steady_products >= 46
|
||||
), f"Expected >= 46 steady rate values ({stable_products_value}) for /products, got {count_steady_products}"
|
||||
|
||||
# check that non-stable values are due to gap averaging (should be lower)
|
||||
gap_boundary_values = [
|
||||
v["value"] for v in products_values if v["value"] != stable_products_value
|
||||
]
|
||||
for val in gap_boundary_values:
|
||||
assert (
|
||||
0 < val < stable_products_value
|
||||
), f"Gap boundary values should be between 0 and {stable_products_value}, got {val}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) >= 59
|
||||
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(
|
||||
1 for v in checkout_values if v["value"] == stable_checkout_value
|
||||
)
|
||||
assert (
|
||||
count_steady_checkout >= 53
|
||||
), f"Expected >= 53 steady {time_aggregation} values ({stable_checkout_value}) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(
|
||||
1 for v in checkout_values if v["value"] == spike_checkout_value
|
||||
)
|
||||
assert (
|
||||
count_spike_checkout >= 4
|
||||
), f"Expected >= 4 spike {time_aggregation} values ({spike_checkout_value}) for /checkout, got {count_spike_checkout}"
|
||||
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate(checkout_values) if v["value"] == spike_checkout_value
|
||||
]
|
||||
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
|
||||
# consecutiveness
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# reset at t31 causes: rate/increase at t30 includes gap (lower), t31 has high rate after reset
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) >= 58
|
||||
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(
|
||||
1 for v in orders_values if v["value"] == stable_orders_value
|
||||
)
|
||||
assert (
|
||||
count_steady_orders >= 55
|
||||
), f"Expected >= 55 steady {time_aggregation} values ({stable_orders_value}) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [
|
||||
v["value"] for v in orders_values if v["value"] != stable_orders_value
|
||||
]
|
||||
assert (
|
||||
len(non_standard_orders) >= 2
|
||||
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > stable_orders_value]
|
||||
assert (
|
||||
len(high_rate_orders) >= 1
|
||||
), f"Expected at least one high {time_aggregation} value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) >= 54
|
||||
), f"Expected >= 54 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users >= 40
|
||||
), f"Expected >= 40 zero {time_aggregation} values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == spike_users_value)
|
||||
assert (
|
||||
count_increment_rate >= 8
|
||||
), f"Expected >= 8 increment {time_aggregation} values ({spike_users_value}) for /users, got {count_increment_rate}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
@@ -2,9 +2,12 @@
|
||||
Look at the histogram_data_1h.jsonl file for the relevant data
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
from typing import Callable, List, Optional, Union
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -13,6 +16,7 @@ from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
@@ -20,6 +24,7 @@ from fixtures.querier import (
|
||||
from fixtures.utils import get_testdata_file_path
|
||||
|
||||
FILE = get_testdata_file_path("histogram_data_1h.jsonl")
|
||||
FILE_WITH_MANY_GROUPS = get_testdata_file_path("histogram_data_1h_many_groups.jsonl")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -372,3 +377,717 @@ def test_histogram_count_no_param(
|
||||
values[1]["value"] == first_values[le]
|
||||
) ## to keep parallel to the cumulative test cases, first_value refers to the value at 10:02
|
||||
assert values[-1]["value"] == last_values[le]
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"space_agg, zeroth_value, first_value, last_value",
|
||||
[
|
||||
("p50", 500, 818.182, 550.725),
|
||||
("p75", 750, 3000, 826.087),
|
||||
("p90", 900, 6400, 991.304),
|
||||
("p95", 950, 8000, 4200),
|
||||
("p99", 990, 8000, 8000),
|
||||
],
|
||||
)
|
||||
def test_histogram_percentile_for_all_services(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
space_agg: str,
|
||||
zeroth_value: float,
|
||||
first_value: float,
|
||||
last_value: float,
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_{space_agg}_bucket"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"doesnotreallymatter",
|
||||
space_agg,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
result_values = sorted(get_series_values(data, "A"), key=lambda x: x["timestamp"])
|
||||
assert len(result_values) == 60
|
||||
assert result_values[0]["value"] == zeroth_value
|
||||
assert result_values[1]["value"] == first_value
|
||||
assert result_values[-1]["value"] == last_value
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"space_agg, first_value, last_value",
|
||||
[
|
||||
("p50", 818.182, 550.725),
|
||||
("p75", 3000, 826.087),
|
||||
("p90", 6400, 991.304),
|
||||
("p95", 8000, 4200),
|
||||
("p99", 8000, 8000),
|
||||
],
|
||||
)
|
||||
def test_histogram_percentile_for_cumulative_service(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
space_agg: str,
|
||||
first_value: float,
|
||||
last_value: float,
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_{space_agg}_cumulative_bucket"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"doesnotreallymatter",
|
||||
space_agg,
|
||||
filter_expression='service = "api"',
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
result_values = sorted(get_series_values(data, "A"), key=lambda x: x["timestamp"])
|
||||
assert len(result_values) == 59
|
||||
assert result_values[0]["value"] == first_value
|
||||
assert result_values[-1]["value"] == last_value
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"space_agg, zeroth_value, first_value, last_value",
|
||||
[
|
||||
("p50", 500, 818.182, 550.725),
|
||||
("p75", 750, 3000, 826.087),
|
||||
("p90", 900, 6400, 991.304),
|
||||
("p95", 950, 8000, 4200),
|
||||
("p99", 990, 8000, 8000),
|
||||
],
|
||||
)
|
||||
def test_histogram_percentile_for_delta_service(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
space_agg: str,
|
||||
zeroth_value: float,
|
||||
first_value: float,
|
||||
last_value: float,
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_{space_agg}_bucket"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"doesnotreallymatter",
|
||||
space_agg,
|
||||
filter_expression='service = "web"',
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
result_values = sorted(get_series_values(data, "A"), key=lambda x: x["timestamp"])
|
||||
assert len(result_values) == 60
|
||||
assert result_values[0]["value"] == zeroth_value
|
||||
assert result_values[1]["value"] == first_value
|
||||
assert result_values[-1]["value"] == last_value
|
||||
|
||||
|
||||
def _assert_series_endpoint_labels(
|
||||
series: list,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
prefix: str,
|
||||
) -> None:
|
||||
labels = [s.get("labels", [{}])[0].get("value", "unknown") for s in series]
|
||||
if isinstance(expected_endpoints, set):
|
||||
assert (
|
||||
set(labels) == expected_endpoints
|
||||
), f"Expected {prefix} endpoints {expected_endpoints}, got {set(labels)}"
|
||||
else:
|
||||
assert labels == expected_endpoints, (
|
||||
f"Expected {prefix} endpoints in order {expected_endpoints}, got {labels}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_lim2",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
2,
|
||||
2,
|
||||
["/checkout", "/health"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
3,
|
||||
["/orders", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim2",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
2,
|
||||
2,
|
||||
["/orders", "/health"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
[build_order_by("count(test_histogram_count_groupby_asc_metric_name)", "asc")],
|
||||
None,
|
||||
3,
|
||||
["/health", "/orders", "/checkout"], ## health and orders have the same size so they are then sorted endpoint as a tiebreaker
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim2",
|
||||
[build_order_by("count(test_histogram_count_groupby_asc_metric_name_lim2)", "asc")],
|
||||
2,
|
||||
2,
|
||||
["/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
[build_order_by("count(test_histogram_count_groupby_desc_metric_name)", "desc")],
|
||||
None,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"], ## health and orders have the same size so they are then sorted endpoint as a tiebreaker
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim2",
|
||||
[build_order_by("count(test_histogram_count_groupby_desc_metric_name_lim2)", "desc")],
|
||||
2,
|
||||
2,
|
||||
["/checkout", "/health"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_histogram_count_group_by_endpoint(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
order_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_histogram_count_groupby_{order_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query_count = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"increase",
|
||||
"count",
|
||||
comparisonSpaceAggregationParam={"threshold": 1000, "operator": "<="},
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query_count])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
count_all_series = get_all_series(data, "A")
|
||||
|
||||
assert (
|
||||
len(count_all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(count_all_series)}"
|
||||
|
||||
_assert_series_endpoint_labels(count_all_series, expected_endpoints, "count")
|
||||
|
||||
count_values = {}
|
||||
for series in count_all_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
count_values[endpoint] = sorted(
|
||||
series.get("values", []), key=lambda x: x["timestamp"]
|
||||
)
|
||||
|
||||
for endpoint, values in count_values.items():
|
||||
for v in values:
|
||||
assert v["value"] >= 0, f"Count for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health (cumulative, service=api): 59 points, increase starts at 11/min → 69/min
|
||||
if "/health" in count_values:
|
||||
vals = count_values["/health"]
|
||||
assert vals[0]["value"] == 11, f"Expected /health count first=11, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 69, f"Expected /health count last=69, got {vals[-1]['value']}"
|
||||
|
||||
# /orders (cumulative, service=api): same distribution as /health
|
||||
if "/orders" in count_values:
|
||||
vals = count_values["/orders"]
|
||||
assert vals[0]["value"] == 11, f"Expected /orders count first=11, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 69, f"Expected /orders count last=69, got {vals[-1]['value']}"
|
||||
|
||||
# /checkout (delta, service=web): 60 points, zeroth=12345 (raw delta), then 11/min → 69/min
|
||||
if "/checkout" in count_values:
|
||||
vals = count_values["/checkout"]
|
||||
assert vals[0]["value"] == 12345, f"Expected /checkout count zeroth=12345, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 11, f"Expected /checkout count first=11, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 69, f"Expected /checkout count last=69, got {vals[-1]['value']}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
4,
|
||||
[ "/checkout", "/health", "/orders", "/coupon"],
|
||||
),
|
||||
(
|
||||
"only_limit",
|
||||
None,
|
||||
1,
|
||||
1,
|
||||
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
4,
|
||||
[ "/checkout", "/coupon", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_lim2",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
2,
|
||||
2,
|
||||
["/checkout", "/coupon"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
4,
|
||||
["/orders", "/health", "/coupon", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim2",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
2,
|
||||
2,
|
||||
["/orders", "/health"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name)", "asc")],
|
||||
None,
|
||||
4,
|
||||
["/coupon", "/orders", "/checkout", "/health"], ## health and checkout have the same size so they are then sorted endpoint as a tiebreaker
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim2",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name_lim2)", "asc")],
|
||||
2,
|
||||
2,
|
||||
["/coupon", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim3",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name_lim3)", "asc")],
|
||||
3,
|
||||
3,
|
||||
["/coupon", "/orders", "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name)", "desc")],
|
||||
None,
|
||||
4,
|
||||
[ "/checkout", "/health", "/orders", "/coupon"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim2",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name_lim2)", "desc")],
|
||||
2,
|
||||
2,
|
||||
[ "/checkout", "/health"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim2",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name_lim2)", "desc")],
|
||||
1,
|
||||
1,
|
||||
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_histogram_percentile_group_by_endpoint(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
order_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_histogram_p75_groupby_{order_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE_WITH_MANY_GROUPS,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query_p75 = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"doesnotreallymatter",
|
||||
"p75",
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query_p75])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
p75_series = get_all_series(data, "A")
|
||||
|
||||
for series in p75_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
values = series.get("values", [])
|
||||
|
||||
assert (
|
||||
len(p75_series) == expected_count
|
||||
), f"Expected {expected_count} p75 series, got {len(p75_series)}"
|
||||
|
||||
_assert_series_endpoint_labels(p75_series, expected_endpoints, "p75")
|
||||
|
||||
p75_values = {}
|
||||
for series in p75_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
p75_values[endpoint] = sorted(
|
||||
series.get("values", []), key=lambda x: x["timestamp"]
|
||||
)
|
||||
|
||||
for endpoint, values in p75_values.items():
|
||||
for v in values:
|
||||
assert v["value"] >= 0, f"p75 for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health (cumulative, service=api)
|
||||
if "/health" in p75_values:
|
||||
vals = p75_values["/health"]
|
||||
assert vals[0]["value"] == 6000, f"Expected /health p75 first=8000, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 6000, f"Expected /health p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
# /orders (cumulative, service=api): same distribution as /health
|
||||
if "/orders" in p75_values:
|
||||
vals = p75_values["/orders"]
|
||||
assert vals[0]["value"] == 4500, f"Expected /orders p75 first=6400, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 4500, f"Expected /orders p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
# /checkout (delta, service=web): 60 points
|
||||
if "/checkout" in p75_values:
|
||||
vals = p75_values["/checkout"]
|
||||
assert vals[0]["value"] == 6000, f"Expected /checkout p75 zeroth=900, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 6000, f"Expected /checkout p75 first=6400, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 6000, f"Expected /checkout p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
# /coupon (delta, service=web): 60 points
|
||||
if "/coupon" in p75_values:
|
||||
vals = p75_values["/coupon"]
|
||||
assert vals[0]["value"] == 1125, f"Expected /coupon p75 zeroth=900, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 1125, f"Expected /coupon p75 first=6400, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 1125, f"Expected /coupon p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by,limit,expected_count,expected_endpoints, expected_status_codes",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
5,
|
||||
[ "/checkout", "/health", "/orders", "/coupon", "/coupon"], ## coupon has 200 and 500 status codes so it will appear twice
|
||||
[ "200", "200", "200", "200", "500"],
|
||||
),
|
||||
(
|
||||
"only_limit",
|
||||
None,
|
||||
1,
|
||||
1,
|
||||
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
|
||||
[ "200"]
|
||||
),
|
||||
(
|
||||
"asc_endpoint",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
5,
|
||||
[ "/checkout", "/coupon", "/coupon", "/health", "/orders"],
|
||||
[ "200", "200", "500", "200", "200"],
|
||||
),
|
||||
(
|
||||
"asc_endpoint_status_code",
|
||||
[build_order_by("endpoint", "asc"), build_order_by("status_code", "asc")],
|
||||
None,
|
||||
5,
|
||||
[ "/checkout", "/coupon", "/coupon", "/health", "/orders"],
|
||||
[ "200", "200", "500", "200", "200"],
|
||||
),
|
||||
(
|
||||
"asc_status_code_endpoint",
|
||||
[build_order_by("status_code", "asc"), build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
5,
|
||||
[ "/checkout", "/coupon", "/health", "/orders", "/coupon"],
|
||||
[ "200", "200", "200", "200", "500"],
|
||||
),
|
||||
(
|
||||
"asc_endpoint_limit_2",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
2,
|
||||
2,
|
||||
[ "/checkout", "/coupon"],
|
||||
[ "200", "200"],
|
||||
),
|
||||
(
|
||||
"asc_endpoint_status_code_limit_2",
|
||||
[build_order_by("endpoint", "asc"), build_order_by("status_code", "asc")],
|
||||
2,
|
||||
2,
|
||||
[ "/checkout", "/coupon"],
|
||||
[ "200", "200"],
|
||||
),
|
||||
(
|
||||
"asc_status_code_endpoint_limit_4",
|
||||
[build_order_by("status_code", "asc"), build_order_by("endpoint", "asc")],
|
||||
4,
|
||||
4,
|
||||
[ "/checkout", "/coupon", "/health", "/orders"],
|
||||
[ "200", "200", "200", "200"],
|
||||
),
|
||||
(
|
||||
"desc_endpoint",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/orders", "/health", "/coupon", "/coupon", "/checkout"],
|
||||
[ "200", "200", "200", "500", "200"],
|
||||
),
|
||||
(
|
||||
"desc_endpoint_status_code",
|
||||
[build_order_by("endpoint", "desc"), build_order_by("status_code", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/orders", "/health", "/coupon", "/coupon", "/checkout"],
|
||||
[ "200", "200", "500", "200", "200"],
|
||||
),
|
||||
(
|
||||
"desc_status_code_endpoint",
|
||||
[build_order_by("status_code", "desc"), build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/coupon", "/orders", "/health", "/coupon", "/checkout"],
|
||||
[ "500", "200", "200", "200", "200"],
|
||||
),
|
||||
(
|
||||
"desc_endpoint_limit2",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/orders", "/health", "/coupon"],
|
||||
[ "200", "200", "200"],
|
||||
),
|
||||
(
|
||||
"desc_endpoint_status_code_limit3",
|
||||
[build_order_by("endpoint", "desc"), build_order_by("status_code", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/orders", "/health", "/coupon"],
|
||||
[ "200", "200", "500"],
|
||||
),
|
||||
(
|
||||
"desc_status_code_endpoint_limit2",
|
||||
[build_order_by("status_code", "desc"), build_order_by("endpoint", "desc")],
|
||||
2,
|
||||
2,
|
||||
["/coupon", "/orders"],
|
||||
[ "500", "200"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_histogram_percentile_group_by_endpoint_and_status_code(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
order_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
expected_status_codes: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_histogram_p75_groupby_{order_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE_WITH_MANY_GROUPS,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query_p75 = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"doesnotreallymatter",
|
||||
"p75",
|
||||
group_by=["endpoint", "status_code"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query_p75])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
p75_series = get_all_series(data, "A")
|
||||
|
||||
for series in p75_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
status_code = series.get("labels", [{}])[1].get("value", "unknown")
|
||||
values = series.get("values", [])
|
||||
avg = sum(v["value"] for v in values) / len(values) if values else 0
|
||||
logger.warning("endpoint=%s status_code=%s average_p75=%.3f", endpoint, status_code, avg)
|
||||
|
||||
assert (
|
||||
len(p75_series) == expected_count
|
||||
), f"Expected {expected_count} p75 series, got {len(p75_series)}"
|
||||
|
||||
_assert_series_endpoint_labels(p75_series, expected_endpoints, "p75")
|
||||
|
||||
endpoints = [s.get("labels", [{}])[0].get("value", "unknown") for s in p75_series]
|
||||
assert endpoints == expected_endpoints, (
|
||||
f"Expected p75 endpoints in order {expected_endpoints}, got {endpoints}"
|
||||
)
|
||||
status_codes = [s.get("labels", [{}])[1].get("value", "unknown") for s in p75_series]
|
||||
assert status_codes == expected_status_codes, (
|
||||
f"Expected p75 endpoints in order {expected_status_codes}, got {status_codes}"
|
||||
)
|
||||
|
||||
p75_values = {}
|
||||
for series in p75_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
status_code = series.get("labels", [{}])[1].get("value", "unknown")
|
||||
p75_values[endpoint+status_code] = sorted(
|
||||
series.get("values", []), key=lambda x: x["timestamp"]
|
||||
)
|
||||
|
||||
for endpoint, values in p75_values.items():
|
||||
for v in values:
|
||||
assert v["value"] >= 0, f"p75 for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health (cumulative, service=api)
|
||||
if "/health200" in p75_values:
|
||||
vals = p75_values["/health200"]
|
||||
assert vals[0]["value"] == 6000, f"Expected /health p75 first=8000, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 6000, f"Expected /health p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
# /orders (cumulative, service=api): same distribution as /health
|
||||
if "/orders200" in p75_values:
|
||||
vals = p75_values["/orders200"]
|
||||
assert vals[0]["value"] == 4500, f"Expected /orders p75 first=6400, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 4500, f"Expected /orders p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
# /checkout (delta, service=web): 60 points
|
||||
if "/checkout200" in p75_values:
|
||||
vals = p75_values["/checkout200"]
|
||||
assert vals[0]["value"] == 6000, f"Expected /checkout p75 zeroth=900, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 6000, f"Expected /checkout p75 first=6400, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 6000, f"Expected /checkout p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
# /coupon (delta, service=web): 60 points
|
||||
if "/coupon200" in p75_values:
|
||||
vals = p75_values["/coupon200"]
|
||||
assert vals[0]["value"] == 1250, f"Expected /coupon200 p75 zeroth=900, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 1250, f"Expected /coupon200 p75 first=6400, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 1250, f"Expected /coupon200 p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
if "/coupon500" in p75_values:
|
||||
vals = p75_values["/coupon500"]
|
||||
assert vals[0]["value"] == 750, f"Expected /coupon500 p75 zeroth=900, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 750, f"Expected /coupon500 p75 first=6400, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 750, f"Expected /coupon500 p75 last=991.304, got {vals[-1]['value']}"
|
||||
|
||||
@@ -5,13 +5,16 @@ Look at the delta_counters_1h.jsonl file for the relevant data
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Callable, List
|
||||
from typing import Any, Callable, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
@@ -69,16 +72,61 @@ def test_rate_with_steady_values_and_reset(
|
||||
assert v["value"] >= 0, f"Rate should not be negative: {v['value']}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
5,
|
||||
{"/products", "/health", "/checkout", "/orders", "/users"},
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
5,
|
||||
["/checkout", "/health", "/orders", "/products", "/users"],
|
||||
),
|
||||
(
|
||||
"asc_lim3",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
3,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/users", "/products", "/orders", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim3",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/users", "/products", "/orders"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_rate_group_by_endpoint(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
order_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = "test_rate_groupby"
|
||||
metric_name = f"test_rate_groupby_{order_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
DELTA_COUNTERS_FILE,
|
||||
@@ -94,6 +142,8 @@ def test_rate_group_by_endpoint(
|
||||
"rate",
|
||||
"sum",
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
@@ -102,10 +152,23 @@ def test_rate_group_by_endpoint(
|
||||
data = response.json()
|
||||
all_series = get_all_series(data, "A")
|
||||
|
||||
# Should have 5 different endpoints
|
||||
assert (
|
||||
len(all_series) == 5
|
||||
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
|
||||
len(all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(all_series)}"
|
||||
|
||||
endpoint_labels = [
|
||||
series.get("labels", [{}])[0].get("value", "unknown")
|
||||
for series in all_series
|
||||
]
|
||||
|
||||
if isinstance(expected_endpoints, set):
|
||||
assert (
|
||||
set(endpoint_labels) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
|
||||
else:
|
||||
assert endpoint_labels == expected_endpoints, (
|
||||
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
|
||||
)
|
||||
|
||||
# endpoint -> values
|
||||
endpoint_values = {}
|
||||
@@ -114,11 +177,6 @@ def test_rate_group_by_endpoint(
|
||||
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
|
||||
endpoint_values[endpoint] = values
|
||||
|
||||
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
|
||||
assert (
|
||||
set(endpoint_values.keys()) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
|
||||
|
||||
# at no point rate should be negative
|
||||
for endpoint, values in endpoint_values.items():
|
||||
for v in values:
|
||||
@@ -128,93 +186,95 @@ def test_rate_group_by_endpoint(
|
||||
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
# rate = 10/60 = 0.167
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) == 60
|
||||
), f"Expected 60 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
|
||||
assert (
|
||||
count_steady_health == 60
|
||||
), f"Expected == 60 steady rate values (0.167) for /health, got {count_steady_health}"
|
||||
# all /health rates should be 0.167 except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
|
||||
if "/health" in endpoint_values:
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) == 60
|
||||
), f"Expected 60 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
|
||||
assert (
|
||||
count_steady_health == 60
|
||||
), f"Expected == 60 steady rate values (0.167) for /health, got {count_steady_health}"
|
||||
# all /health rates should be 0.167 except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) == 51
|
||||
), f"Expected 51 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
|
||||
|
||||
assert (
|
||||
count_steady_products == 51
|
||||
), f"Expected 51 steady rate values (0.333) for /products, got {count_steady_products}"
|
||||
if "/products" in endpoint_values:
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) == 51
|
||||
), f"Expected 51 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
|
||||
assert (
|
||||
count_steady_products == 51
|
||||
), f"Expected 51 steady rate values (0.333) for /products, got {count_steady_products}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) == 61
|
||||
), f"Expected 61 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
|
||||
assert (
|
||||
count_steady_checkout == 56
|
||||
), f"Expected 56 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
|
||||
assert (
|
||||
count_spike_checkout == 5
|
||||
), f"Expected 5 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
|
||||
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
|
||||
]
|
||||
assert len(spike_indices) == 5, f"Expected 5 spike indices, got {spike_indices}"
|
||||
# consecutiveness
|
||||
for i in range(1, len(spike_indices)):
|
||||
if "/checkout" in endpoint_values:
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
len(checkout_values) == 61
|
||||
), f"Expected 61 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
|
||||
assert (
|
||||
count_steady_checkout == 56
|
||||
), f"Expected 56 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
|
||||
assert (
|
||||
count_spike_checkout == 5
|
||||
), f"Expected 5 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
|
||||
]
|
||||
assert len(spike_indices) == 5, f"Expected 5 spike indices, got {spike_indices}"
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# rate = 5/60 = 0.0833
|
||||
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) == 60
|
||||
), f"Expected 59 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
|
||||
assert (
|
||||
count_steady_orders == 58
|
||||
), f"Expected 58 steady rate values (0.0833) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
|
||||
assert (
|
||||
len(non_standard_orders) == 2
|
||||
), f"Expected 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
|
||||
assert (
|
||||
len(high_rate_orders) == 1
|
||||
), f"Expected one high rate value after counter reset, got {non_standard_orders}"
|
||||
if "/orders" in endpoint_values:
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) == 60
|
||||
), f"Expected 59 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
|
||||
assert (
|
||||
count_steady_orders == 58
|
||||
), f"Expected 58 steady rate values (0.0833) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
|
||||
assert (
|
||||
len(non_standard_orders) == 2
|
||||
), f"Expected 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
|
||||
assert (
|
||||
len(high_rate_orders) == 1
|
||||
), f"Expected one high rate value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes (12 of them)
|
||||
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) == 56
|
||||
), f"Expected 56 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users == 44
|
||||
), f"Expected 44 zero rate values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
|
||||
assert (
|
||||
count_increment_rate == 12
|
||||
), f"Expected 12 increment rate values (0.0167) for /users, got {count_increment_rate}"
|
||||
if "/users" in endpoint_values:
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) == 56
|
||||
), f"Expected 56 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users == 44
|
||||
), f"Expected 44 zero rate values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
|
||||
assert (
|
||||
count_increment_rate == 12
|
||||
), f"Expected 12 increment rate values (0.0167) for /users, got {count_increment_rate}"
|
||||
|
||||
2400
tests/integration/testdata/histogram_data_1h_many_groups.jsonl
vendored
Normal file
2400
tests/integration/testdata/histogram_data_1h_many_groups.jsonl
vendored
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user