Compare commits

..

26 Commits

Author SHA1 Message Date
nikhilmantri0902
1948cdfaa4 chore: added for comparison across group by and merges debugging quantile exact 2025-08-19 01:16:24 +05:30
nikhilmantri0902
2fb7fe49ef chore: added for comparison across group by and merges debugging quantile exact 2025-08-19 00:56:43 +05:30
nikhilmantri0902
c4762045a6 chore: added debug functionality 2025-08-18 15:34:20 +05:30
nikhilmantri0902
1541734542 feat: match query building semantics for tags of 0 length 2025-08-18 15:26:12 +05:30
nikhilmantri0902
46e5b407f7 fix: added necessary 0 numCalls handling 2025-08-18 13:53:59 +05:30
nikhilmantri0902
f2c3946101 fix: added necessary 0 numCalls handling 2025-08-18 13:52:55 +05:30
nikhilmantri0902
4dca46de40 chore: added debugging funcs 2025-08-18 13:24:28 +05:30
nikhilmantri0902
6f420abe27 chore: removed comparison block 2025-08-18 13:09:12 +05:30
nikhilmantri0902
1d9b457af6 chore: removed comparison block 2025-08-18 12:40:51 +05:30
nikhilmantri0902
d437998750 chore: tuple issue fixed 2025-08-18 12:13:08 +05:30
nikhilmantri0902
e02d0cdd98 chore: tuple issue fixed 2025-08-18 11:44:45 +05:30
nikhilmantri0902
1ad4a6699a chore: added debug logs 2025-08-18 11:24:29 +05:30
nikhilmantri0902
00ae45022b fix: added filtering based on both name and serviceName pairs 2025-08-18 11:20:08 +05:30
nikhilmantri0902
6f4a965c6d fix: added filtering based on both name and serviceName pairs 2025-08-18 11:19:01 +05:30
nikhilmantri0902
4c29b03577 chore: added logs for debugging 2025-08-15 14:15:20 +05:30
nikhilmantri0902
ea1409bc4f fix: added query optimization 2025-08-14 20:12:48 +05:30
Abhi kumar
0e3ac2a179 fix: added loading indicators in traces pages when running query (#8782) 2025-08-14 13:53:39 +05:30
Amlan Kumar Nandy
249f8be845 fix: resolve infinite loading issue in metric view in messaging queues (#8779) 2025-08-14 04:16:39 +00:00
primus-bot[bot]
9c952942ad chore(release): bump to v0.92.1 (#8780)
Co-authored-by: primus-bot[bot] <171087277+primus-bot[bot]@users.noreply.github.com>
2025-08-13 15:10:08 +05:30
Nityananda Gohain
dac46d82ff fix: check ch version (#8778)
Check the clickhouse version, before the setting secondary_indices_enable_bulk_filtering is used.
2025-08-13 14:57:25 +05:30
primus-bot[bot]
802ce6de01 chore(release): bump to v0.92.0 (#8776)
#### Summary
 - Release SigNoz v0.92.0
 - Bump SigNoz OTel Collector to v0.129.0
2025-08-13 12:17:43 +05:30
dependabot[bot]
6853f0c99d chore(deps): bump urllib3 from 2.4.0 to 2.5.0 in /tests/integration (#8296) 2025-08-13 04:58:39 +00:00
Srikanth Chekuri
3f8a2870e4 fix: key CONTAINS value doesn't work for numeric values (#8768) 2025-08-13 09:59:28 +05:30
Srikanth Chekuri
5fa70ea802 chore: use *_keys tables instead of tag_attributes_v2 for suggestions (#8753) 2025-08-12 18:10:35 +05:30
Yunus M
3a952fa330 fix: pass metric name to get value suggestions api (#8671)
* fix: pass metric name to get value suggestions api

* feat: add source to get value suggestions
2025-08-11 08:10:31 +00:00
Yunus M
6d97db1d9d fix: use localstorage value to avoid waiting for pref api to set the toggle state, add shortcut (#8751) 2025-08-11 10:26:27 +05:30
73 changed files with 1659 additions and 1218 deletions

View File

@@ -40,7 +40,7 @@ services:
timeout: 5s
retries: 3
schema-migrator-sync:
image: signoz/signoz-schema-migrator:v0.128.2
image: signoz/signoz-schema-migrator:v0.129.0
container_name: schema-migrator-sync
command:
- sync
@@ -53,7 +53,7 @@ services:
condition: service_healthy
restart: on-failure
schema-migrator-async:
image: signoz/signoz-schema-migrator:v0.128.2
image: signoz/signoz-schema-migrator:v0.129.0
container_name: schema-migrator-async
command:
- async

View File

@@ -122,6 +122,7 @@ telemetrystore:
max_bytes_to_read: 0
max_result_rows: 0
ignore_data_skipping_indices: ""
secondary_indices_enable_bulk_filtering: false
##################### Prometheus #####################
prometheus:

View File

@@ -174,7 +174,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.91.0
image: signoz/signoz:v0.92.1
command:
- --config=/root/config/prometheus.yml
ports:
@@ -207,7 +207,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.128.2
image: signoz/signoz-otel-collector:v0.129.0
command:
- --config=/etc/otel-collector-config.yaml
- --manager-config=/etc/manager-config.yaml
@@ -231,7 +231,7 @@ services:
- signoz
schema-migrator:
!!merge <<: *common
image: signoz/signoz-schema-migrator:v0.128.2
image: signoz/signoz-schema-migrator:v0.129.0
deploy:
restart_policy:
condition: on-failure

View File

@@ -115,7 +115,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.91.0
image: signoz/signoz:v0.92.1
command:
- --config=/root/config/prometheus.yml
ports:
@@ -148,7 +148,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.128.2
image: signoz/signoz-otel-collector:v0.129.0
command:
- --config=/etc/otel-collector-config.yaml
- --manager-config=/etc/manager-config.yaml
@@ -174,7 +174,7 @@ services:
- signoz
schema-migrator:
!!merge <<: *common
image: signoz/signoz-schema-migrator:v0.128.2
image: signoz/signoz-schema-migrator:v0.129.0
deploy:
restart_policy:
condition: on-failure

View File

@@ -177,7 +177,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.91.0}
image: signoz/signoz:${VERSION:-v0.92.1}
container_name: signoz
command:
- --config=/root/config/prometheus.yml
@@ -211,7 +211,7 @@ services:
# TODO: support otel-collector multiple replicas. Nginx/Traefik for loadbalancing?
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.128.2}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.129.0}
container_name: signoz-otel-collector
command:
- --config=/etc/otel-collector-config.yaml
@@ -237,7 +237,7 @@ services:
condition: service_healthy
schema-migrator-sync:
!!merge <<: *common
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.128.2}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.129.0}
container_name: schema-migrator-sync
command:
- sync
@@ -248,7 +248,7 @@ services:
condition: service_healthy
schema-migrator-async:
!!merge <<: *db-depend
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.128.2}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.129.0}
container_name: schema-migrator-async
command:
- async

View File

@@ -110,7 +110,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.91.0}
image: signoz/signoz:${VERSION:-v0.92.1}
container_name: signoz
command:
- --config=/root/config/prometheus.yml
@@ -143,7 +143,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.128.2}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.129.0}
container_name: signoz-otel-collector
command:
- --config=/etc/otel-collector-config.yaml
@@ -165,7 +165,7 @@ services:
condition: service_healthy
schema-migrator-sync:
!!merge <<: *common
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.128.2}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.129.0}
container_name: schema-migrator-sync
command:
- sync
@@ -177,7 +177,7 @@ services:
restart: on-failure
schema-migrator-async:
!!merge <<: *db-depend
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.128.2}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.129.0}
container_name: schema-migrator-async
command:
- async

View File

@@ -167,9 +167,16 @@ func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds(),
)
st, en := r.Timestamps(ts)
start := st.UnixMilli()
end := en.UnixMilli()
start := ts.Add(-time.Duration(r.EvalWindow())).UnixMilli()
end := ts.UnixMilli()
if r.EvalDelay() > 0 {
start = start - int64(r.EvalDelay().Milliseconds())
end = end - int64(r.EvalDelay().Milliseconds())
}
// round to minute otherwise we could potentially miss data
start = start - (start % (60 * 1000))
end = end - (end % (60 * 1000))
compositeQuery := r.Condition().CompositeQuery
@@ -246,11 +253,9 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
for _, threshold := range r.Thresholds() {
smpl, shouldAlert := threshold.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
}
return resultVector, nil
@@ -291,11 +296,9 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
for _, threshold := range r.Thresholds() {
smpl, shouldAlert := threshold.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
}
return resultVector, nil

View File

@@ -41,9 +41,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// create ch rule task for evalution
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if tr.IsScheduled() {
task.SetSchedule(tr.GetSchedule())
}
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
// create promql rule
@@ -65,9 +63,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// create promql rule task for evalution
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if pr.IsScheduled() {
task.SetSchedule(pr.GetSchedule())
}
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
// create anomaly rule
ar, err := NewAnomalyRule(
@@ -89,9 +85,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// create anomaly rule task for evalution
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if ar.IsScheduled() {
task.SetSchedule(ar.GetSchedule())
}
} else {
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
}

View File

@@ -8,14 +8,15 @@ import {
export const getValueSuggestions = (
props: QueryKeyValueRequestProps,
): Promise<AxiosResponse<QueryKeyValueSuggestionsResponseProps>> => {
const { signal, key, searchText, signalSource } = props;
const { signal, key, searchText, signalSource, metricName } = props;
const encodedSignal = encodeURIComponent(signal);
const encodedKey = encodeURIComponent(key);
const encodedMetricName = encodeURIComponent(metricName || '');
const encodedSearchText = encodeURIComponent(searchText);
const encodedSource = encodeURIComponent(signalSource || '');
return axios.get(
`/fields/values?signal=${encodedSignal}&name=${encodedKey}&searchText=${encodedSearchText}&source=${encodedSource}`,
`/fields/values?signal=${encodedSignal}&name=${encodedKey}&searchText=${encodedSearchText}&metricName=${encodedMetricName}&source=${encodedSource}`,
);
};

View File

@@ -383,6 +383,7 @@ function QuerySearch({
searchText: sanitizedSearchText,
signal: dataSource,
signalSource: signalSource as 'meter' | '',
metricName: debouncedMetricName ?? undefined,
});
// Skip updates if component unmounted or key changed
@@ -474,6 +475,7 @@ function QuerySearch({
activeKey,
dataSource,
isLoadingSuggestions,
debouncedMetricName,
signalSource,
toggleSuggestions,
],

View File

@@ -6,6 +6,7 @@ export const GlobalShortcuts = {
NavigateToAlerts: 'a+shift',
NavigateToExceptions: 'e+shift',
NavigateToMessagingQueues: 'm+shift',
ToggleSidebar: 'b+shift',
};
export const GlobalShortcutsName = {
@@ -16,6 +17,7 @@ export const GlobalShortcutsName = {
NavigateToAlerts: 'shift+a',
NavigateToExceptions: 'shift+e',
NavigateToMessagingQueues: 'shift+m',
ToggleSidebar: 'shift+b',
};
export const GlobalShortcutsDescription = {
@@ -26,4 +28,5 @@ export const GlobalShortcutsDescription = {
NavigateToAlerts: 'Navigate to alerts page',
NavigateToExceptions: 'Navigate to Exceptions page',
NavigateToMessagingQueues: 'Navigate to Messaging Queues page',
ToggleSidebar: 'Toggle sidebar visibility',
};

View File

@@ -0,0 +1,176 @@
import { render, screen } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import logEvent from 'api/common/logEvent';
import { GlobalShortcuts } from 'constants/shortcuts/globalShortcuts';
import { USER_PREFERENCES } from 'constants/userPreferences';
import {
KeyboardHotkeysProvider,
useKeyboardHotkeys,
} from 'hooks/hotkeys/useKeyboardHotkeys';
import { QueryClient, QueryClientProvider } from 'react-query';
// Mock dependencies
jest.mock('api/common/logEvent', () => jest.fn());
// Mock the AppContext
const mockUpdateUserPreferenceInContext = jest.fn();
const SHIFT_B_KEYBOARD_SHORTCUT = '{Shift>}b{/Shift}';
jest.mock('providers/App/App', () => ({
useAppContext: jest.fn(() => ({
userPreferences: [
{
name: USER_PREFERENCES.SIDENAV_PINNED,
value: false,
},
],
updateUserPreferenceInContext: mockUpdateUserPreferenceInContext,
})),
}));
function TestComponent({
mockHandleShortcut,
}: {
mockHandleShortcut: () => void;
}): JSX.Element {
const { registerShortcut } = useKeyboardHotkeys();
registerShortcut(GlobalShortcuts.ToggleSidebar, mockHandleShortcut);
return <div data-testid="test">Test</div>;
}
describe('Sidebar Toggle Shortcut', () => {
let queryClient: QueryClient;
beforeEach(() => {
queryClient = new QueryClient({
defaultOptions: {
queries: {
retry: false,
},
mutations: {
retry: false,
},
},
});
jest.clearAllMocks();
});
afterEach(() => {
jest.clearAllMocks();
});
describe('Global Shortcuts Constants', () => {
it('should have the correct shortcut key combination', () => {
expect(GlobalShortcuts.ToggleSidebar).toBe('b+shift');
});
});
describe('Keyboard Shortcut Registration', () => {
it('should register the sidebar toggle shortcut correctly', async () => {
const user = userEvent.setup();
const mockHandleShortcut = jest.fn();
render(
<QueryClientProvider client={queryClient}>
<KeyboardHotkeysProvider>
<TestComponent mockHandleShortcut={mockHandleShortcut} />
</KeyboardHotkeysProvider>
</QueryClientProvider>,
);
// Trigger the shortcut
await user.keyboard(SHIFT_B_KEYBOARD_SHORTCUT);
expect(mockHandleShortcut).toHaveBeenCalled();
});
it('should not trigger shortcut in input fields', async () => {
const user = userEvent.setup();
const mockHandleShortcut = jest.fn();
function TestComponent(): JSX.Element {
const { registerShortcut } = useKeyboardHotkeys();
registerShortcut(GlobalShortcuts.ToggleSidebar, mockHandleShortcut);
return (
<div>
<input data-testid="input-field" />
<div data-testid="test">Test</div>
</div>
);
}
render(
<QueryClientProvider client={queryClient}>
<KeyboardHotkeysProvider>
<TestComponent />
</KeyboardHotkeysProvider>
</QueryClientProvider>,
);
// Focus on input field
const inputField = screen.getByTestId('input-field');
await user.click(inputField);
// Try to trigger shortcut while focused on input
await user.keyboard('{Shift>}b{/Shift}');
// Should not trigger the shortcut
expect(mockHandleShortcut).not.toHaveBeenCalled();
});
});
describe('Sidebar Toggle Functionality', () => {
it('should log the toggle event with correct parameters', async () => {
const user = userEvent.setup();
const mockHandleShortcut = jest.fn(() => {
logEvent('Global Shortcut: Sidebar Toggle', {
previousState: false,
newState: true,
});
});
render(
<QueryClientProvider client={queryClient}>
<KeyboardHotkeysProvider>
<TestComponent mockHandleShortcut={mockHandleShortcut} />
</KeyboardHotkeysProvider>
</QueryClientProvider>,
);
await user.keyboard(SHIFT_B_KEYBOARD_SHORTCUT);
expect(logEvent).toHaveBeenCalledWith('Global Shortcut: Sidebar Toggle', {
previousState: false,
newState: true,
});
});
it('should update user preference in context', async () => {
const user = userEvent.setup();
const mockHandleShortcut = jest.fn(() => {
const save = {
name: USER_PREFERENCES.SIDENAV_PINNED,
value: true,
};
mockUpdateUserPreferenceInContext(save);
});
render(
<QueryClientProvider client={queryClient}>
<KeyboardHotkeysProvider>
<TestComponent mockHandleShortcut={mockHandleShortcut} />
</KeyboardHotkeysProvider>
</QueryClientProvider>,
);
await user.keyboard(SHIFT_B_KEYBOARD_SHORTCUT);
expect(mockUpdateUserPreferenceInContext).toHaveBeenCalledWith({
name: USER_PREFERENCES.SIDENAV_PINNED,
value: true,
});
});
});
});

View File

@@ -10,8 +10,10 @@ import setLocalStorageApi from 'api/browser/localstorage/set';
import getChangelogByVersion from 'api/changelog/getChangelogByVersion';
import logEvent from 'api/common/logEvent';
import manageCreditCardApi from 'api/v1/portal/create';
import updateUserPreference from 'api/v1/user/preferences/name/update';
import getUserLatestVersion from 'api/v1/version/getLatestVersion';
import getUserVersion from 'api/v1/version/getVersion';
import { AxiosError } from 'axios';
import cx from 'classnames';
import ChangelogModal from 'components/ChangelogModal/ChangelogModal';
import ChatSupportGateway from 'components/ChatSupportGateway/ChatSupportGateway';
@@ -22,10 +24,12 @@ import { Events } from 'constants/events';
import { FeatureKeys } from 'constants/features';
import { LOCALSTORAGE } from 'constants/localStorage';
import ROUTES from 'constants/routes';
import { GlobalShortcuts } from 'constants/shortcuts/globalShortcuts';
import { USER_PREFERENCES } from 'constants/userPreferences';
import SideNav from 'container/SideNav';
import TopNav from 'container/TopNav';
import dayjs from 'dayjs';
import { useKeyboardHotkeys } from 'hooks/hotkeys/useKeyboardHotkeys';
import { useIsDarkMode } from 'hooks/useDarkMode';
import { useGetTenantLicense } from 'hooks/useGetTenantLicense';
import { useNotifications } from 'hooks/useNotifications';
@@ -68,8 +72,10 @@ import {
LicensePlatform,
LicenseState,
} from 'types/api/licensesV3/getActive';
import { UserPreference } from 'types/api/preferences/preference';
import AppReducer from 'types/reducer/app';
import { USER_ROLES } from 'types/roles';
import { showErrorNotification } from 'utils/error';
import { eventEmitter } from 'utils/getEventEmitter';
import {
getFormattedDate,
@@ -662,10 +668,85 @@ function AppLayout(props: AppLayoutProps): JSX.Element {
</div>
);
const sideNavPinned = userPreferences?.find(
const sideNavPinnedPreference = userPreferences?.find(
(preference) => preference.name === USER_PREFERENCES.SIDENAV_PINNED,
)?.value as boolean;
// Add loading state to prevent layout shift during initial load
const [isSidebarLoaded, setIsSidebarLoaded] = useState(false);
// Get sidebar state from localStorage as fallback until preferences are loaded
const getSidebarStateFromLocalStorage = useCallback((): boolean => {
try {
const storedValue = getLocalStorageApi(USER_PREFERENCES.SIDENAV_PINNED);
return storedValue === 'true';
} catch {
return false;
}
}, []);
// Set sidebar as loaded after user preferences are fetched
useEffect(() => {
if (userPreferences !== null) {
setIsSidebarLoaded(true);
}
}, [userPreferences]);
// Use localStorage value as fallback until preferences are loaded
const isSideNavPinned = isSidebarLoaded
? sideNavPinnedPreference
: getSidebarStateFromLocalStorage();
const { registerShortcut, deregisterShortcut } = useKeyboardHotkeys();
const { updateUserPreferenceInContext } = useAppContext();
const { mutate: updateUserPreferenceMutation } = useMutation(
updateUserPreference,
{
onError: (error) => {
showErrorNotification(notifications, error as AxiosError);
},
},
);
const handleToggleSidebar = useCallback((): void => {
const newState = !isSideNavPinned;
logEvent('Global Shortcut: Sidebar Toggle', {
previousState: isSideNavPinned,
newState,
});
// Save to localStorage immediately for instant feedback
setLocalStorageApi(USER_PREFERENCES.SIDENAV_PINNED, newState.toString());
// Update the context immediately
const save = {
name: USER_PREFERENCES.SIDENAV_PINNED,
value: newState,
};
updateUserPreferenceInContext(save as UserPreference);
// Make the API call in the background
updateUserPreferenceMutation({
name: USER_PREFERENCES.SIDENAV_PINNED,
value: newState,
});
}, [
isSideNavPinned,
updateUserPreferenceInContext,
updateUserPreferenceMutation,
]);
// Register the sidebar toggle shortcut
useEffect(() => {
registerShortcut(GlobalShortcuts.ToggleSidebar, handleToggleSidebar);
return (): void => {
deregisterShortcut(GlobalShortcuts.ToggleSidebar);
};
}, [registerShortcut, deregisterShortcut, handleToggleSidebar]);
const SHOW_TRIAL_EXPIRY_BANNER =
showTrialExpiryBanner && !showPaymentFailedWarning;
const SHOW_WORKSPACE_RESTRICTED_BANNER = showWorkspaceRestricted;
@@ -739,14 +820,14 @@ function AppLayout(props: AppLayoutProps): JSX.Element {
className={cx(
'app-layout',
isDarkMode ? 'darkMode dark' : 'lightMode',
sideNavPinned ? 'side-nav-pinned' : '',
isSideNavPinned ? 'side-nav-pinned' : '',
SHOW_WORKSPACE_RESTRICTED_BANNER ? 'isWorkspaceRestricted' : '',
SHOW_TRIAL_EXPIRY_BANNER ? 'isTrialExpired' : '',
SHOW_PAYMENT_FAILED_BANNER ? 'isPaymentFailed' : '',
)}
>
{isToDisplayLayout && !renderFullScreen && (
<SideNav isPinned={sideNavPinned} />
<SideNav isPinned={isSideNavPinned} />
)}
<div
className={cx('app-content', {

View File

@@ -1,6 +1,7 @@
import './MySettings.styles.scss';
import { Radio, RadioChangeEvent, Switch, Tag } from 'antd';
import setLocalStorageApi from 'api/browser/localstorage/set';
import logEvent from 'api/common/logEvent';
import updateUserPreference from 'api/v1/user/preferences/name/update';
import { AxiosError } from 'axios';
@@ -109,6 +110,9 @@ function MySettings(): JSX.Element {
// Optimistically update the UI
setSideNavPinned(checked);
// Save to localStorage immediately for instant feedback
setLocalStorageApi(USER_PREFERENCES.SIDENAV_PINNED, checked.toString());
// Update the context immediately
const save = {
name: USER_PREFERENCES.SIDENAV_PINNED,
@@ -130,6 +134,8 @@ function MySettings(): JSX.Element {
name: USER_PREFERENCES.SIDENAV_PINNED,
value: !checked,
} as UserPreference);
// Also revert localStorage
setLocalStorageApi(USER_PREFERENCES.SIDENAV_PINNED, (!checked).toString());
showErrorNotification(notifications, error as AxiosError);
},
},

View File

@@ -20,6 +20,7 @@ function TimeSeriesViewContainer({
dataSource = DataSource.TRACES,
isFilterApplied,
setWarning,
setIsLoadingQueries,
}: TimeSeriesViewProps): JSX.Element {
const { stagedQuery, currentQuery, panelType } = useQueryBuilder();
@@ -83,6 +84,14 @@ function TimeSeriesViewContainer({
[data, isValidToConvertToMs],
);
useEffect(() => {
if (isLoading || isFetching) {
setIsLoadingQueries(true);
} else {
setIsLoadingQueries(false);
}
}, [isLoading, isFetching, setIsLoadingQueries]);
return (
<TimeSeriesView
isFilterApplied={isFilterApplied}
@@ -101,6 +110,7 @@ interface TimeSeriesViewProps {
dataSource?: DataSource;
isFilterApplied: boolean;
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
}
TimeSeriesViewContainer.defaultProps = {

View File

@@ -49,9 +49,14 @@ import { getListColumns, transformDataWithDate } from './utils';
interface ListViewProps {
isFilterApplied: boolean;
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
}
function ListView({ isFilterApplied, setWarning }: ListViewProps): JSX.Element {
function ListView({
isFilterApplied,
setWarning,
setIsLoadingQueries,
}: ListViewProps): JSX.Element {
const {
stagedQuery,
panelType: panelTypeFromQueryBuilder,
@@ -162,6 +167,14 @@ function ListView({ isFilterApplied, setWarning }: ListViewProps): JSX.Element {
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [data?.payload, data?.warning]);
useEffect(() => {
if (isLoading || isFetching) {
setIsLoadingQueries(true);
} else {
setIsLoadingQueries(false);
}
}, [isLoading, isFetching, setIsLoadingQueries]);
const dataLength =
data?.payload?.data?.newResult?.data?.result[0]?.list?.length;
const totalCount = useMemo(() => dataLength || 0, [dataLength]);

View File

@@ -16,8 +16,10 @@ import { GlobalReducer } from 'types/reducer/globalTime';
function TableView({
setWarning,
setIsLoadingQueries,
}: {
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
}): JSX.Element {
const { stagedQuery, panelType } = useQueryBuilder();
@@ -26,7 +28,7 @@ function TableView({
GlobalReducer
>((state) => state.globalTime);
const { data, isLoading, isError, error } = useGetQueryRange(
const { data, isLoading, isFetching, isError, error } = useGetQueryRange(
{
query: stagedQuery || initialQueriesMap.traces,
graphType: panelType || PANEL_TYPES.TABLE,
@@ -49,6 +51,14 @@ function TableView({
},
);
useEffect(() => {
if (isLoading || isFetching) {
setIsLoadingQueries(true);
} else {
setIsLoadingQueries(false);
}
}, [isLoading, isFetching, setIsLoadingQueries]);
const queryTableData = useMemo(
() =>
data?.payload?.data?.newResult?.data?.result ||

View File

@@ -40,11 +40,13 @@ import { ActionsContainer, Container } from './styles';
interface TracesViewProps {
isFilterApplied: boolean;
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
}
function TracesView({
isFilterApplied,
setWarning,
setIsLoadingQueries,
}: TracesViewProps): JSX.Element {
const { stagedQuery, panelType } = useQueryBuilder();
const [orderBy, setOrderBy] = useState<string>('timestamp:desc');
@@ -117,6 +119,14 @@ function TracesView({
[responseData],
);
useEffect(() => {
if (isLoading || isFetching) {
setIsLoadingQueries(true);
} else {
setIsLoadingQueries(false);
}
}, [isLoading, isFetching, setIsLoadingQueries]);
useEffect(() => {
if (!isLoading && !isFetching && !isError && (tableData || []).length !== 0) {
logEvent('Traces Explorer: Data present', {

View File

@@ -10,6 +10,7 @@ export const useGetQueryKeyValueSuggestions = ({
signal,
searchText,
signalSource,
metricName,
}: {
key: string;
signal: 'traces' | 'logs' | 'metrics';
@@ -18,17 +19,26 @@ export const useGetQueryKeyValueSuggestions = ({
options?: UseQueryOptions<
SuccessResponse<QueryKeyValueSuggestionsResponseProps> | ErrorResponse
>;
metricName?: string;
}): UseQueryResult<
AxiosResponse<QueryKeyValueSuggestionsResponseProps>,
AxiosError
> =>
useQuery<AxiosResponse<QueryKeyValueSuggestionsResponseProps>, AxiosError>({
queryKey: ['queryKeyValueSuggestions', key, signal, searchText, signalSource],
queryKey: [
'queryKeyValueSuggestions',
key,
signal,
searchText,
signalSource,
metricName,
],
queryFn: () =>
getValueSuggestions({
signal,
key,
searchText: searchText || '',
signalSource: signalSource as 'meter' | '',
metricName: metricName || '',
}),
});

View File

@@ -6,7 +6,7 @@ import cx from 'classnames';
import { CardContainer } from 'container/GridCardLayout/styles';
import { useIsDarkMode } from 'hooks/useDarkMode';
import { ChevronDown, ChevronUp } from 'lucide-react';
import { useRef, useState } from 'react';
import { useCallback, useRef, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { Widgets } from 'types/api/dashboard/getAll';
@@ -129,23 +129,22 @@ function MetricPage(): JSX.Element {
},
];
const [renderedGraphCount, setRenderedGraphCount] = useState(0);
const renderedGraphCountRef = useRef(0);
const hasLoggedRef = useRef(false);
const checkIfDataExists = (isDataAvailable: boolean): void => {
const checkIfDataExists = useCallback((isDataAvailable: boolean): void => {
if (isDataAvailable) {
const newCount = renderedGraphCount + 1;
setRenderedGraphCount(newCount);
renderedGraphCountRef.current += 1;
// Only log when first graph has rendered and we haven't logged yet
if (newCount === 1 && !hasLoggedRef.current) {
if (renderedGraphCountRef.current === 1 && !hasLoggedRef.current) {
logEvent('MQ Kafka: Metric view', {
graphRendered: true,
});
hasLoggedRef.current = true;
}
}
};
}, []);
return (
<div className="metric-page">

View File

@@ -69,6 +69,7 @@ function TracesExplorer(): JSX.Element {
// Get panel type from URL
const panelTypesFromUrl = useGetPanelTypesQueryParam(PANEL_TYPES.LIST);
const [isLoadingQueries, setIsLoadingQueries] = useState<boolean>(false);
const [selectedView, setSelectedView] = useState<ExplorerViews>(() =>
getExplorerViewFromUrl(searchParams, panelTypesFromUrl),
@@ -323,6 +324,7 @@ function TracesExplorer(): JSX.Element {
rightActions={
<RightToolbarActions
onStageRunQuery={(): void => handleRunQuery(true, true)}
isLoadingQueries={isLoadingQueries}
/>
}
/>
@@ -344,13 +346,21 @@ function TracesExplorer(): JSX.Element {
{selectedView === ExplorerViews.LIST && (
<div className="trace-explorer-list-view">
<ListView isFilterApplied={isFilterApplied} setWarning={setWarning} />
<ListView
isFilterApplied={isFilterApplied}
setWarning={setWarning}
setIsLoadingQueries={setIsLoadingQueries}
/>
</div>
)}
{selectedView === ExplorerViews.TRACE && (
<div className="trace-explorer-traces-view">
<TracesView isFilterApplied={isFilterApplied} setWarning={setWarning} />
<TracesView
isFilterApplied={isFilterApplied}
setWarning={setWarning}
setIsLoadingQueries={setIsLoadingQueries}
/>
</div>
)}
@@ -360,13 +370,17 @@ function TracesExplorer(): JSX.Element {
dataSource={DataSource.TRACES}
isFilterApplied={isFilterApplied}
setWarning={setWarning}
setIsLoadingQueries={setIsLoadingQueries}
/>
</div>
)}
{selectedView === ExplorerViews.TABLE && (
<div className="trace-explorer-table-view">
<TableView setWarning={setWarning} />
<TableView
setWarning={setWarning}
setIsLoadingQueries={setIsLoadingQueries}
/>
</div>
)}
</div>

View File

@@ -46,6 +46,7 @@ export interface QueryKeyValueRequestProps {
key: string;
searchText: string;
signalSource?: 'meter' | '';
metricName?: string;
}
export type SignalType = 'traces' | 'logs' | 'metrics';

1
go.mod
View File

@@ -53,7 +53,6 @@ require (
github.com/spf13/cobra v1.9.1
github.com/srikanthccv/ClickHouse-go-mock v0.12.0
github.com/stretchr/testify v1.10.0
github.com/teambition/rrule-go v1.8.2
github.com/tidwall/gjson v1.18.0
github.com/uptrace/bun v1.2.9
github.com/uptrace/bun/dialect/pgdialect v1.2.9

2
go.sum
View File

@@ -918,8 +918,6 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=

View File

@@ -31,6 +31,7 @@ func NewAPI(
telemetryStore,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,
telemetrytraces.SpanAttributesKeysTblName,
telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName,
@@ -39,6 +40,8 @@ func NewAPI(
telemetrylogs.DBName,
telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName,
telemetrylogs.LogAttributeKeysTblName,
telemetrylogs.LogResourceKeysTblName,
telemetrymetadata.DBName,
telemetrymetadata.AttributesMetadataLocalTableName,
)

View File

@@ -50,6 +50,7 @@ func newProvider(
telemetryStore,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,
telemetrytraces.SpanAttributesKeysTblName,
telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName,
@@ -58,6 +59,8 @@ func newProvider(
telemetrylogs.DBName,
telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName,
telemetrylogs.LogAttributeKeysTblName,
telemetrylogs.LogResourceKeysTblName,
telemetrymetadata.DBName,
telemetrymetadata.AttributesMetadataLocalTableName,
)
@@ -69,12 +72,13 @@ func newProvider(
resourceFilterFieldMapper := resourcefilter.NewFieldMapper()
resourceFilterConditionBuilder := resourcefilter.NewConditionBuilder(resourceFilterFieldMapper)
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
settings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
telemetryMetadataStore,
)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(nil, traceFieldMapper, traceConditionBuilder, "", nil)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(settings, nil, traceFieldMapper, traceConditionBuilder, "", nil)
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
settings,
telemetryMetadataStore,
@@ -89,6 +93,7 @@ func newProvider(
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
settings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
telemetryMetadataStore,
@@ -97,6 +102,7 @@ func newProvider(
telemetrylogs.GetBodyJSONKey,
)
logAggExprRewriter := querybuilder.NewAggExprRewriter(
settings,
telemetrylogs.DefaultFullTextColumn,
logFieldMapper,
logConditionBuilder,

View File

@@ -385,7 +385,7 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
return resourceSubQuery, nil
}
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
if r.indexTable == "" {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
@@ -428,7 +428,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
query := fmt.Sprintf(
`SELECT
quantile(0.99)(duration_nano) as p99,
toFloat64(quantileExact(0.99)(duration_nano)) as p99,
avg(duration_nano) as avgDuration,
count(*) as numCalls
FROM %s.%s
@@ -510,6 +510,274 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
return &serviceItems, nil
}
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
if r.indexTable == "" {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
}
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
if apiErr != nil {
return nil, apiErr
}
// Build parallel arrays for arrayZip approach
var ops []string
var svcs []string
serviceOperationsMap := make(map[string][]string)
for svc, opsList := range *topLevelOps {
// Cap operations to 1500 per service (same as original logic)
cappedOps := opsList[:int(math.Min(1500, float64(len(opsList))))]
serviceOperationsMap[svc] = cappedOps
// Add to parallel arrays
for _, op := range cappedOps {
ops = append(ops, op)
svcs = append(svcs, svc)
}
}
fmt.Printf("Operation pairs count: %d\n", len(ops))
// Build resource subquery for all services, but only include our target services
targetServices := make([]string, 0, len(*topLevelOps))
for svc := range *topLevelOps {
targetServices = append(targetServices, svc)
}
resourceSubQuery, err := r.buildResourceSubQueryForServices(queryParams.Tags, targetServices, *queryParams.Start, *queryParams.End)
if err != nil {
zap.L().Error("Error building resource subquery", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
// Build the optimized single query using arrayZip for tuple creation
query := fmt.Sprintf(`
SELECT
resource_string_service$$name AS serviceName,
toFloat64(quantileExact(0.99)(duration_nano)) AS p99,
avg(duration_nano) AS avgDuration,
count(*) AS numCalls,
countIf(statusCode = 2) AS numErrors
FROM %s.%s
WHERE (name, resource_string_service$$name) IN arrayZip(@ops, @svcs)
AND timestamp >= @start
AND timestamp <= @end
AND ts_bucket_start >= @start_bucket
AND ts_bucket_start <= @end_bucket
AND (resource_fingerprint GLOBAL IN %s)
GROUP BY serviceName
ORDER BY numCalls DESC`,
r.TraceDB, r.traceTableName, resourceSubQuery,
)
args := []interface{}{
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
// Important: wrap slices with clickhouse.Array for IN/array params
clickhouse.Named("ops", ops),
clickhouse.Named("svcs", svcs),
}
fmt.Printf("Query: %s\n", query)
// Execute the single optimized query
rows, err := r.db.Query(ctx, query, args...)
if err != nil {
zap.L().Error("Error executing optimized services query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
defer rows.Close()
// Process results
serviceItems := []model.ServiceItem{}
for rows.Next() {
var serviceItem model.ServiceItem
err := rows.ScanStruct(&serviceItem)
if err != nil {
zap.L().Error("Error scanning service item", zap.Error(err))
continue
}
// Skip services with zero calls (match original behavior)
if serviceItem.NumCalls == 0 {
continue
}
// Add data warning for this service
if ops, exists := serviceOperationsMap[serviceItem.ServiceName]; exists {
serviceItem.DataWarning = model.DataWarning{
TopLevelOps: ops,
}
}
// Calculate derived fields
serviceItem.CallRate = float64(serviceItem.NumCalls) / float64(queryParams.Period)
if serviceItem.NumCalls > 0 {
serviceItem.ErrorRate = float64(serviceItem.NumErrors) * 100 / float64(serviceItem.NumCalls)
}
serviceItems = append(serviceItems, serviceItem)
}
if err = rows.Err(); err != nil {
zap.L().Error("Error iterating over service results", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
// Fetch results from the original GetServicesOG for comparison
ogResults, ogErr := r.GetServicesOG(ctx, queryParams)
if ogErr != nil {
zap.L().Error("Error fetching OG service results", zap.Error(ogErr))
} else {
// Compare the optimized results with OG results
ogMap := make(map[string]model.ServiceItem)
for _, ogItem := range *ogResults {
ogMap[ogItem.ServiceName] = ogItem
}
for _, optItem := range serviceItems {
if ogItem, exists := ogMap[optItem.ServiceName]; exists {
// Compare key fields (NumCalls, NumErrors, etc.)
if optItem.NumCalls != ogItem.NumCalls ||
optItem.NumErrors != ogItem.NumErrors ||
int64(optItem.Percentile99) != int64(ogItem.Percentile99) ||
int64(optItem.AvgDuration) != int64(ogItem.AvgDuration) {
fmt.Printf(
"[Discrepancy] Service: %s | optNumCalls: %d, ogNumCalls: %d | optNumErrors: %d, ogNumErrors: %d | optP99: %.2f, ogP99: %.2f | optAvgDuration: %.2f, ogAvgDuration: %.2f\n",
optItem.ServiceName,
optItem.NumCalls, ogItem.NumCalls,
optItem.NumErrors, ogItem.NumErrors,
optItem.Percentile99, ogItem.Percentile99,
optItem.AvgDuration, ogItem.AvgDuration,
)
}
} else {
zap.L().Warn("Service present in optimized results but missing in OG results",
zap.String("service", optItem.ServiceName))
}
}
// Check for services present in OG but missing in optimized
optMap := make(map[string]struct{})
for _, optItem := range serviceItems {
optMap[optItem.ServiceName] = struct{}{}
}
for _, ogItem := range *ogResults {
if _, exists := optMap[ogItem.ServiceName]; !exists {
fmt.Printf("Service present in OG results but missing in optimized results: %s\n", ogItem.ServiceName)
}
}
}
return &serviceItems, nil
}
// buildResourceSubQueryForServices builds a resource subquery that includes only specific services
// This maintains service context while optimizing for multiple services in a single query
func (r *ClickHouseReader) buildResourceSubQueryForServices(tags []model.TagQueryParam, targetServices []string, start, end time.Time) (string, error) {
if len(targetServices) == 0 {
return "", fmt.Errorf("no target services provided")
}
if len(tags) == 0 {
// For exact parity with per-service behavior, build via resource builder with only service filter
filterSet := v3.FilterSet{}
filterSet.Items = append(filterSet.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorIn,
Value: targetServices,
})
resourceSubQuery, err := resource.BuildResourceSubQuery(
r.TraceDB,
r.traceResourceTableV3,
start.Unix()-1800,
end.Unix(),
&filterSet,
[]v3.AttributeKey{},
v3.AttributeKey{},
false)
if err != nil {
zap.L().Error("Error building resource subquery for services", zap.Error(err))
return "", err
}
return resourceSubQuery, nil
}
// Convert tags to filter set
filterSet := v3.FilterSet{}
for _, tag := range tags {
// Skip the collector id as we don't add it to traces
if tag.Key == "signoz.collector.id" {
continue
}
var it v3.FilterItem
it.Key = v3.AttributeKey{
Key: tag.Key,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
}
switch tag.Operator {
case model.NotInOperator:
it.Operator = v3.FilterOperatorNotIn
it.Value = tag.StringValues
case model.InOperator:
it.Operator = v3.FilterOperatorIn
it.Value = tag.StringValues
default:
return "", fmt.Errorf("operator %s not supported", tag.Operator)
}
filterSet.Items = append(filterSet.Items, it)
}
// Add service filter to limit to our target services
filterSet.Items = append(filterSet.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorIn,
Value: targetServices,
})
// Build resource subquery with service-specific filtering
resourceSubQuery, err := resource.BuildResourceSubQuery(
r.TraceDB,
r.traceResourceTableV3,
start.Unix()-1800,
end.Unix(),
&filterSet,
[]v3.AttributeKey{},
v3.AttributeKey{},
false)
if err != nil {
zap.L().Error("Error building resource subquery for services", zap.Error(err))
return "", err
}
return resourceSubQuery, nil
}
// buildServiceInClause creates a properly quoted IN clause for service names
func (r *ClickHouseReader) buildServiceInClause(services []string) string {
var quotedServices []string
for _, svc := range services {
// Escape single quotes and wrap in quotes
escapedSvc := strings.ReplaceAll(svc, "'", "\\'")
quotedServices = append(quotedServices, fmt.Sprintf("'%s'", escapedSvc))
}
return strings.Join(quotedServices, ", ")
}
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
// status can only be two and if both are selected than they are equivalent to none selected
if _, ok := excludeMap["status"]; ok {
@@ -529,7 +797,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
}
return query
}
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
tags := []model.TagQuery{}
for _, tag := range queryParams {
@@ -686,7 +953,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
}
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
}
func (r *ClickHouseReader) GetEntryPointOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, error) {
// Step 1: Get top operations for the given service
topOps, err := r.GetTopOperations(ctx, queryParams)
@@ -755,9 +1021,9 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
query := fmt.Sprintf(`
SELECT
quantile(0.5)(durationNano) as p50,
quantile(0.95)(durationNano) as p95,
quantile(0.99)(durationNano) as p99,
toFloat64(quantileExact(0.5)(durationNano)) as p50,
toFloat64(quantileExact(0.95)(durationNano)) as p95,
toFloat64(quantileExact(0.99)(durationNano)) as p99,
COUNT(*) as numCalls,
countIf(status_code=2) as errorCount,
name
@@ -1239,11 +1505,11 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
SELECT
src as parent,
dest as child,
result[1] AS p50,
result[2] AS p75,
result[3] AS p90,
result[4] AS p95,
result[5] AS p99,
toFloat64(result[1]) AS p50,
toFloat64(result[2]) AS p75,
toFloat64(result[3]) AS p90,
toFloat64(result[4]) AS p95,
toFloat64(result[5]) AS p99,
sum(total_count) as callCount,
sum(total_count)/ @duration AS callRate,
sum(error_count)/sum(total_count) * 100 as errorRate
@@ -1275,7 +1541,6 @@ func getLocalTableName(tableName string) string {
return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1]
}
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
@@ -1416,7 +1681,6 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
}(ttlPayload)
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
@@ -2057,7 +2321,6 @@ func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.Li
return &getErrorResponses, nil
}
func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.CountErrorsParams) (uint64, *model.ApiError) {
var errorCount uint64
@@ -2169,7 +2432,6 @@ func (r *ClickHouseReader) GetNextPrevErrorIDs(ctx context.Context, queryParams
return &getNextPrevErrorIDsResponse, nil
}
func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {
var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse
@@ -2830,7 +3092,6 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
return &response, nil
}
func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
@@ -2905,7 +3166,6 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
return &attributeValues, nil
}
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.UUID, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
unixMilli := common.PastDayRoundOff()
@@ -3577,7 +3837,6 @@ func readRow(vars []interface{}, columnNames []string, countOfNumberCols int) ([
}
return groupBy, groupAttributes, groupAttributesArray, nil
}
func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string, countOfNumberCols int) ([]*v3.Series, error) {
// when groupBy is applied, each combination of cartesian product
// of attribute values is a separate series. Each item in seriesToPoints
@@ -4373,7 +4632,6 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
return timeline, nil
}
func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) {
query := fmt.Sprintf(`SELECT
@@ -4962,7 +5220,6 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context,
}
return timeSeries, nil
}
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.UUID, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
var args []interface{}
@@ -5180,7 +5437,6 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.
return &response, nil
}
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
var args []interface{}
@@ -5760,7 +6016,6 @@ func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_e
Series: &seriesList,
}, nil
}
func (r *ClickHouseReader) GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) {
// Build dynamic key selections and JSON extracts
var jsonExtracts []string
@@ -5933,7 +6188,6 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
}
return hasLE, nil
}
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
var missingMetrics []string

View File

@@ -85,11 +85,6 @@ type BaseRule struct {
TemporalityMap map[string]map[v3.Temporality]bool
sqlstore sqlstore.SQLStore
evaluation ruletypes.Evaluation
schedule string
scheduleStartsAt time.Time
}
type RuleOption func(*BaseRule)
@@ -144,8 +139,6 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
Active: map[uint64]*ruletypes.Alert{},
reader: reader,
TemporalityMap: make(map[string]map[v3.Temporality]bool),
evaluation: p.Evaluation,
schedule: p.Schedule,
}
if baseRule.evalWindow == 0 {
@@ -217,18 +210,6 @@ func (r *BaseRule) TargetVal() float64 {
return r.targetVal()
}
func (r *BaseRule) Thresholds() []ruletypes.RuleThreshold {
return r.ruleCondition.Thresholds
}
func (r *BaseRule) IsScheduled() bool {
return r.schedule != ""
}
func (r *BaseRule) GetSchedule() (string, time.Time) {
return r.schedule, r.scheduleStartsAt
}
func (r *ThresholdRule) hostFromSource() string {
parsedUrl, err := url.Parse(r.source)
if err != nil {
@@ -260,10 +241,8 @@ func (r *BaseRule) Unit() string {
}
func (r *BaseRule) Timestamps(ts time.Time) (time.Time, time.Time) {
st, en := r.evaluation.EvaluationTime(ts)
start := st.UnixMilli()
end := en.UnixMilli()
start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli()
end := ts.UnixMilli()
if r.evalDelay > 0 {
start = start - int64(r.evalDelay.Milliseconds())

View File

@@ -168,9 +168,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// create ch rule task for evalution
task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if tr.IsScheduled() {
task.SetSchedule(tr.GetSchedule())
}
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
// create promql rule
@@ -192,9 +190,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// create promql rule task for evalution
task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if pr.IsScheduled() {
task.SetSchedule(pr.GetSchedule())
}
} else {
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
}

View File

@@ -125,7 +125,8 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
prevState := r.State()
start, end := r.Timestamps(ts)
start := ts.Add(-r.evalWindow)
end := ts
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
valueFormatter := formatter.FromUnit(r.Unit())
@@ -150,86 +151,84 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
var alerts = make(map[uint64]*ruletypes.Alert, len(res))
for _, series := range res {
for _, ruleThreshold := range r.Thresholds() {
l := make(map[string]string, len(series.Metric))
for _, lbl := range series.Metric {
l[lbl.Name] = lbl.Value
l := make(map[string]string, len(series.Metric))
for _, lbl := range series.Metric {
l[lbl.Name] = lbl.Value
}
if len(series.Floats) == 0 {
continue
}
alertSmpl, shouldAlert := r.ShouldAlert(toCommonSeries(series))
if !shouldAlert {
continue
}
r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", series)
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold)
// Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
expand := func(text string) string {
tmpl := ruletypes.NewTemplateExpander(
ctx,
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData)
}
return result
}
if len(series.Floats) == 0 {
continue
}
lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel)
resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels()
alertSmpl, shouldAlert := ruleThreshold.ShouldAlert(toCommonSeries(series))
if !shouldAlert {
continue
}
r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", series)
for name, value := range r.labels.Map() {
lb.Set(name, expand(value))
}
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
lb.Set(qslabels.AlertNameLabel, r.Name())
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold)
// Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
for name, value := range r.annotations.Map() {
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
}
expand := func(text string) string {
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
tmpl := ruletypes.NewTemplateExpander(
ctx,
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData)
}
return result
}
if _, ok := alerts[h]; ok {
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = ruletypes.HealthBad
r.lastError = err
return nil, err
}
lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel)
resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels()
for name, value := range r.labels.Map() {
lb.Set(name, expand(value))
}
lb.Set(qslabels.AlertNameLabel, r.Name())
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
for name, value := range r.annotations.Map() {
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
}
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = ruletypes.HealthBad
r.lastError = err
return nil, err
}
alerts[h] = &ruletypes.Alert{
Labels: lbs,
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: model.StatePending,
Value: alertSmpl.V,
GeneratorURL: r.GeneratorURL(),
Receivers: r.preferredChannels,
}
alerts[h] = &ruletypes.Alert{
Labels: lbs,
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: model.StatePending,
Value: alertSmpl.V,
GeneratorURL: r.GeneratorURL(),
Receivers: r.preferredChannels,
}
}

View File

@@ -3,7 +3,6 @@ package rules
import (
"context"
"fmt"
"github.com/teambition/rrule-go"
"sort"
"sync"
"time"
@@ -40,8 +39,6 @@ type PromRuleTask struct {
maintenanceStore ruletypes.MaintenanceStore
orgID valuer.UUID
schedule string
scheduleStartsAt time.Time
}
// newPromRuleTask holds rules that have promql condition
@@ -78,10 +75,6 @@ func (g *PromRuleTask) Key() string {
return g.name + ";" + g.file
}
func (g *PromRuleTask) IsCronSchedule() bool {
return g.schedule != ""
}
func (g *PromRuleTask) Type() TaskType { return TaskTypeProm }
// Rules returns the group's rules.
@@ -98,6 +91,38 @@ func (g *PromRuleTask) Pause(b bool) {
func (g *PromRuleTask) Run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"name": g.Name(),
},
})
iter := func() {
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.frequency` occurrence.
tick := time.NewTicker(g.frequency)
defer tick.Stop()
// defer cleanup
defer func() {
if !g.markStale {
return
@@ -114,114 +139,22 @@ func (g *PromRuleTask) Run(ctx context.Context) {
}(time.Now())
}()
if g.IsCronSchedule() {
schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression
if err != nil {
zap.L().Error("failed to parse rrule expression", zap.String("rrule", g.schedule), zap.Error(err))
return
}
now := time.Now()
nextRun := schedule.After(now, false)
iter()
// let the group iterate and run
for {
select {
case <-time.After(time.Until(nextRun)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleRuleTask": map[string]string{
"name": g.Name(),
},
})
iter := func() {
if g.pause {
return
}
start := time.Now()
g.Eval(ctx, start) // using current time instead of evalTimestamp
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
iter()
currentRun := nextRun
for {
// Calculate the next run time
nextRun = schedule.After(currentRun, false)
default:
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-time.After(time.Until(nextRun)):
// Check if we missed any scheduled runs
now := time.Now()
if now.After(nextRun.Add(time.Minute)) { // Allow 1 minute tolerance
zap.L().Warn("missed scheduled run",
zap.Time("scheduled", nextRun),
zap.Time("actual", now))
}
currentRun = nextRun
iter()
}
}
}
} else {
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"name": g.Name(),
},
})
iter := func() {
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.frequency` occurrence.
tick := time.NewTicker(g.frequency)
defer tick.Stop()
iter()
// let the group iterate and run
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
iter()
}
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
iter()
}
}
}
@@ -312,13 +245,6 @@ func (g *PromRuleTask) setLastEvaluation(ts time.Time) {
g.lastEvaluation = ts
}
func (g *PromRuleTask) SetSchedule(schedule string, t time.Time) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.schedule = schedule
g.scheduleStartsAt = t
}
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time {
var (

View File

@@ -12,7 +12,6 @@ import (
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
opentracing "github.com/opentracing/opentracing-go"
"github.com/teambition/rrule-go"
"go.uber.org/zap"
)
@@ -37,10 +36,6 @@ type RuleTask struct {
maintenanceStore ruletypes.MaintenanceStore
orgID valuer.UUID
// New field for rrule-based scheduling
schedule string
scheduleStartsAt time.Time
}
const DefaultFrequency = 1 * time.Minute
@@ -76,10 +71,6 @@ func (g *RuleTask) Key() string {
return g.name + ";" + g.file
}
func (g *RuleTask) IsCronSchedule() bool {
return g.schedule != ""
}
// Name returns the group name.
func (g *RuleTask) Type() TaskType { return TaskTypeCh }
@@ -104,119 +95,56 @@ func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) con
func (g *RuleTask) Run(ctx context.Context) {
defer close(g.terminated)
if g.IsCronSchedule() {
schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression
if err != nil {
zap.L().Error("failed to parse rrule expression", zap.String("rrule", g.schedule), zap.Error(err))
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleRuleTask": map[string]string{
"name": g.Name(),
},
})
iter := func() {
if g.pause {
// todo(amol): remove in memory active alerts
// and last series state
return
}
now := time.Now()
nextRun := schedule.After(now, false)
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.frequency` occurrence.
tick := time.NewTicker(g.frequency)
defer tick.Stop()
iter()
// let the group iterate and run
for {
select {
case <-time.After(time.Until(nextRun)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleRuleTask": map[string]string{
"name": g.Name(),
},
})
iter := func() {
if g.pause {
return
}
start := time.Now()
g.Eval(ctx, start) // using current time instead of evalTimestamp
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
iter()
currentRun := nextRun
for {
// Calculate the next run time
nextRun = schedule.After(currentRun, false)
default:
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-time.After(time.Until(nextRun)):
// Check if we missed any scheduled runs
now := time.Now()
if now.After(nextRun.Add(time.Minute)) { // Allow 1 minute tolerance
zap.L().Warn("missed scheduled run",
zap.Time("scheduled", nextRun),
zap.Time("actual", now))
}
currentRun = nextRun
iter()
}
}
}
} else {
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleRuleTask": map[string]string{
"name": g.Name(),
},
})
iter := func() {
if g.pause {
// todo(amol): remove in memory active alerts
// and last series state
return
}
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.frequency` occurrence.
tick := time.NewTicker(g.frequency)
defer tick.Stop()
iter()
// let the group iterate and run
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
iter()
}
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
iter()
}
}
}
@@ -370,13 +298,6 @@ func (g *RuleTask) CopyState(fromTask Task) error {
return nil
}
func (g *RuleTask) SetSchedule(schedule string, t time.Time) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.schedule = schedule
g.scheduleStartsAt = t
}
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
@@ -458,41 +379,3 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
}(i, rule)
}
}
// Helper to convert ruletypes.Schedule/Recurrence to rrule.ROption
func recurrenceToROption(s *ruletypes.Schedule) rrule.ROption {
// Only basic mapping for daily/weekly/monthly, can be extended
opt := rrule.ROption{
Dtstart: s.Recurrence.StartTime,
}
switch s.Recurrence.RepeatType {
case ruletypes.RepeatTypeDaily:
opt.Freq = rrule.DAILY
case ruletypes.RepeatTypeWeekly:
opt.Freq = rrule.WEEKLY
for _, day := range s.Recurrence.RepeatOn {
switch day {
case ruletypes.RepeatOnSunday:
opt.Byweekday = append(opt.Byweekday, rrule.SU)
case ruletypes.RepeatOnMonday:
opt.Byweekday = append(opt.Byweekday, rrule.MO)
case ruletypes.RepeatOnTuesday:
opt.Byweekday = append(opt.Byweekday, rrule.TU)
case ruletypes.RepeatOnWednesday:
opt.Byweekday = append(opt.Byweekday, rrule.WE)
case ruletypes.RepeatOnThursday:
opt.Byweekday = append(opt.Byweekday, rrule.TH)
case ruletypes.RepeatOnFriday:
opt.Byweekday = append(opt.Byweekday, rrule.FR)
case ruletypes.RepeatOnSaturday:
opt.Byweekday = append(opt.Byweekday, rrule.SA)
}
}
case ruletypes.RepeatTypeMonthly:
opt.Freq = rrule.MONTHLY
}
if s.Recurrence.EndTime != nil {
opt.Until = *s.Recurrence.EndTime
}
return opt
}

View File

@@ -28,8 +28,6 @@ type Task interface {
Rules() []Rule
Stop()
Pause(b bool)
IsCronSchedule() bool
SetSchedule(string, time.Time)
}
// newTask returns an appropriate group for

View File

@@ -479,11 +479,9 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
}
for _, series := range queryResult.Series {
for _, threshold := range r.Thresholds() {
smpl, shouldAlert := threshold.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
}
@@ -551,11 +549,9 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
}
for _, series := range queryResult.Series {
for _, threshold := range r.Thresholds() {
smpl, shouldAlert := threshold.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
}
@@ -596,7 +592,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
}
value := valueFormatter.Format(smpl.V, r.Unit())
//todo(aniket): handle different threshold
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
r.logger.DebugContext(ctx, "Alert template data for rule", "rule_name", r.Name(), "formatter", valueFormatter.Name(), "value", value, "threshold", threshold)

View File

@@ -870,10 +870,6 @@ func TestPrepareLinksToLogs(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -917,10 +913,6 @@ func TestPrepareLinksToLogsV5(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -971,10 +963,6 @@ func TestPrepareLinksToTracesV5(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -1025,10 +1013,6 @@ func TestPrepareLinksToTraces(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -1157,10 +1141,6 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
@@ -1211,10 +1191,6 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
@@ -1272,10 +1248,6 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -1379,7 +1351,6 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
postableRule.RuleCondition.Target = &c.target
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
postableRule.RuleCondition.TargetUnit = c.targetUnit
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
postableRule.Annotations = map[string]string{
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
@@ -1427,10 +1398,6 @@ func TestThresholdRuleNoData(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -1524,10 +1491,6 @@ func TestThresholdRuleTracesLink(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -1593,7 +1556,6 @@ func TestThresholdRuleTracesLink(t *testing.T) {
postableRule.RuleCondition.Target = &c.target
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
postableRule.RuleCondition.TargetUnit = c.targetUnit
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
postableRule.Annotations = map[string]string{
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
@@ -1640,10 +1602,6 @@ func TestThresholdRuleLogsLink(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -1721,7 +1679,6 @@ func TestThresholdRuleLogsLink(t *testing.T) {
postableRule.RuleCondition.Target = &c.target
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
postableRule.RuleCondition.TargetUnit = c.targetUnit
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
postableRule.Annotations = map[string]string{
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
@@ -1769,10 +1726,6 @@ func TestThresholdRuleShiftBy(t *testing.T) {
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
@@ -1829,172 +1782,3 @@ func TestThresholdRuleShiftBy(t *testing.T) {
assert.Equal(t, int64(10), params.CompositeQuery.BuilderQueries["A"].ShiftBy)
}
func TestMultipleThresholdRule(t *testing.T) {
postableRule := ruletypes.PostableRule{
AlertName: "Mulitple threshold test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
Evaluation: &ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
AggregateOperator: v3.AggregateOperatorSumRate,
DataSource: v3.DataSourceMetrics,
Expression: "A",
},
},
},
},
}
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
cols := make([]cmock.ColumnType, 0)
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
cols = append(cols, cmock.ColumnType{Name: "attr", Type: "String"})
cols = append(cols, cmock.ColumnType{Name: "timestamp", Type: "String"})
cases := []struct {
targetUnit string
yAxisUnit string
values [][]interface{}
expectAlerts int
compareOp string
matchType string
target float64
secondTarget float64
summaryAny []string
}{
{
targetUnit: "s",
yAxisUnit: "ns",
values: [][]interface{}{
{float64(572588400), "attr", time.Now()}, // 0.57 seconds
{float64(572386400), "attr", time.Now().Add(1 * time.Second)}, // 0.57 seconds
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 0.3 seconds
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 0.3 seconds
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 0.06 seconds
},
expectAlerts: 2,
compareOp: "1", // Above
matchType: "1", // Once
target: 1, // 1 second
secondTarget: .5,
summaryAny: []string{
"observed metric value is 573 ms",
"observed metric value is 572 ms",
},
},
{
targetUnit: "ms",
yAxisUnit: "ns",
values: [][]interface{}{
{float64(572588400), "attr", time.Now()}, // 572.58 ms
{float64(572386400), "attr", time.Now().Add(1 * time.Second)}, // 572.38 ms
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 300.94 ms
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 299.31 ms
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 66.64 ms
},
expectAlerts: 6,
compareOp: "1", // Above
matchType: "1", // Once
target: 200, // 200 ms
secondTarget: 500,
summaryAny: []string{
"observed metric value is 299 ms",
"the observed metric value is 573 ms",
"the observed metric value is 572 ms",
"the observed metric value is 301 ms",
},
},
{
targetUnit: "decgbytes",
yAxisUnit: "bytes",
values: [][]interface{}{
{float64(2863284053), "attr", time.Now()}, // 2.86 GB
{float64(2863388842), "attr", time.Now().Add(1 * time.Second)}, // 2.86 GB
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 0.3 GB
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 0.3 GB
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 66.64 MB
},
expectAlerts: 2,
compareOp: "1", // Above
matchType: "1", // Once
target: 200, // 200 GB
secondTarget: 2, // 2GB
summaryAny: []string{
"observed metric value is 2.7 GiB",
"the observed metric value is 0.3 GB",
},
},
}
logger := instrumentationtest.New().Logger()
for idx, c := range cases {
rows := cmock.NewRows(cols, c.values)
// We are testing the eval logic after the query is run
// so we don't care about the query string here
queryString := "SELECT any"
telemetryStore.Mock().
ExpectQuery(queryString).
WillReturnRows(rows)
postableRule.RuleCondition.CompareOp = ruletypes.CompareOp(c.compareOp)
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
postableRule.RuleCondition.TargetUnit = c.targetUnit
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold("first_threshold", &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit),
ruletypes.NewBasicRuleThreshold("second_threshold", &c.secondTarget, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit),
}
postableRule.Annotations = map[string]string{
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
readerCache, err := cachetest.New(cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
require.NoError(t, err)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
},
}
if err != nil {
assert.NoError(t, err)
}
retVal, err := rule.Eval(context.Background(), time.Now())
if err != nil {
assert.NoError(t, err)
}
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
if c.expectAlerts != 0 {
foundCount := 0
for _, item := range rule.Active {
for _, summary := range c.summaryAny {
if strings.Contains(item.Annotations.Get("summary"), summary) {
foundCount++
break
}
}
}
assert.Equal(t, c.expectAlerts, foundCount, "case %d", idx)
}
}
}

View File

@@ -3,10 +3,12 @@ package querybuilder
import (
"context"
"fmt"
"log/slog"
"strings"
chparser "github.com/AfterShip/clickhouse-sql-parser/parser"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -14,6 +16,7 @@ import (
)
type aggExprRewriter struct {
logger *slog.Logger
fullTextColumn *telemetrytypes.TelemetryFieldKey
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
@@ -24,13 +27,17 @@ type aggExprRewriter struct {
var _ qbtypes.AggExprRewriter = (*aggExprRewriter)(nil)
func NewAggExprRewriter(
settings factory.ProviderSettings,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *aggExprRewriter {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/agg_rewrite")
return &aggExprRewriter{
logger: set.Logger(),
fullTextColumn: fullTextColumn,
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
@@ -70,7 +77,7 @@ func (r *aggExprRewriter) Rewrite(
return "", nil, errors.NewInternalf(errors.CodeInternal, "no SELECT items for %q", expr)
}
visitor := newExprVisitor(keys,
visitor := newExprVisitor(r.logger, keys,
r.fullTextColumn,
r.fieldMapper,
r.conditionBuilder,
@@ -117,6 +124,7 @@ func (r *aggExprRewriter) RewriteMulti(
// exprVisitor walks FunctionExpr nodes and applies the mappers.
type exprVisitor struct {
chparser.DefaultASTVisitor
logger *slog.Logger
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
fullTextColumn *telemetrytypes.TelemetryFieldKey
fieldMapper qbtypes.FieldMapper
@@ -129,6 +137,7 @@ type exprVisitor struct {
}
func newExprVisitor(
logger *slog.Logger,
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
fieldMapper qbtypes.FieldMapper,
@@ -137,6 +146,7 @@ func newExprVisitor(
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *exprVisitor {
return &exprVisitor{
logger: logger,
fieldKeys: fieldKeys,
fullTextColumn: fullTextColumn,
fieldMapper: fieldMapper,
@@ -183,6 +193,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
whereClause, err := PrepareWhereClause(
origPred,
FilterExprVisitorOpts{
Logger: v.logger,
FieldKeys: v.fieldKeys,
FieldMapper: v.fieldMapper,
ConditionBuilder: v.conditionBuilder,

View File

@@ -2,7 +2,11 @@ package querybuilder
import (
"context"
"encoding/json"
"fmt"
"math"
"reflect"
"strconv"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
@@ -116,3 +120,58 @@ func GroupByKeys(keys []qbtypes.GroupByKey) []string {
}
return k
}
func FormatValueForContains(value any) string {
if value == nil {
return ""
}
switch v := value.(type) {
case string:
return v
case []byte:
return string(v)
case json.Number:
return v.String()
case float64:
if v == math.Trunc(v) && v >= -1e15 && v <= 1e15 {
return fmt.Sprintf("%.0f", v)
}
return strconv.FormatFloat(v, 'f', -1, 64)
case float32:
return strconv.FormatFloat(float64(v), 'f', -1, 32)
case int, int8, int16, int32, int64:
return fmt.Sprintf("%d", v)
case uint, uint8, uint16, uint32, uint64:
return fmt.Sprintf("%d", v)
case bool:
return strconv.FormatBool(v)
case fmt.Stringer:
return v.String()
default:
// fallback - try to convert through reflection
rv := reflect.ValueOf(value)
switch rv.Kind() {
case reflect.Float32, reflect.Float64:
f := rv.Float()
if f == math.Trunc(f) && f >= -1e15 && f <= 1e15 {
return fmt.Sprintf("%.0f", f)
}
return strconv.FormatFloat(f, 'f', -1, 64)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return strconv.FormatInt(rv.Int(), 10)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return strconv.FormatUint(rv.Uint(), 10)
default:
return fmt.Sprintf("%v", value)
}
}
}

View File

@@ -0,0 +1,275 @@
package querybuilder
import (
"encoding/json"
"fmt"
"math"
"testing"
"github.com/stretchr/testify/assert"
)
type customStringer struct {
value string
}
func (c customStringer) String() string {
return c.value
}
type customInt int64
type customFloat float64
type customUint uint64
func TestFormatValueForContains(t *testing.T) {
tests := []struct {
name string
input any
expected string
}{
{
name: "nil value",
input: nil,
expected: "",
},
{
name: "string value",
input: "hello world",
expected: "hello world",
},
{
name: "empty string",
input: "",
expected: "",
},
{
name: "string with special characters",
input: "test@#$%^&*()_+-=",
expected: "test@#$%^&*()_+-=",
},
{
name: "byte slice",
input: []byte("byte slice test"),
expected: "byte slice test",
},
{
name: "empty byte slice",
input: []byte{},
expected: "",
},
{
name: "json.Number integer",
input: json.Number("521509198310"),
expected: "521509198310",
},
{
name: "json.Number float",
input: json.Number("3.14159"),
expected: "3.14159",
},
{
name: "json.Number scientific notation",
input: json.Number("1.23e+10"),
expected: "1.23e+10",
},
{
name: "float64 whole number",
input: float64(42),
expected: "42",
},
{
name: "float64 decimal",
input: float64(3.14159),
expected: "3.14159",
},
{
name: "float64 large whole number",
input: float64(521509198310),
expected: "521509198310",
},
{
name: "float64 at positive threshold",
input: float64(1e15),
expected: "1000000000000000",
},
{
name: "float64 above positive threshold",
input: float64(1e16),
expected: "10000000000000000",
},
{
name: "float64 at negative threshold",
input: float64(-1e15),
expected: "-1000000000000000",
},
{
name: "float64 negative decimal",
input: float64(-123.456),
expected: "-123.456",
},
{
name: "float64 zero",
input: float64(0),
expected: "0",
},
{
name: "float32 whole number",
input: float32(42),
expected: "42",
},
{
name: "float32 decimal",
input: float32(3.14),
expected: "3.14",
},
{
name: "int",
input: int(123),
expected: "123",
},
{
name: "int negative",
input: int(-456),
expected: "-456",
},
{
name: "int8 max",
input: int8(127),
expected: "127",
},
{
name: "int8 min",
input: int8(-128),
expected: "-128",
},
{
name: "int16",
input: int16(32767),
expected: "32767",
},
{
name: "int32",
input: int32(2147483647),
expected: "2147483647",
},
{
name: "int64",
input: int64(9223372036854775807),
expected: "9223372036854775807",
},
{
name: "uint",
input: uint(123),
expected: "123",
},
{
name: "uint8 max",
input: uint8(255),
expected: "255",
},
{
name: "uint16",
input: uint16(65535),
expected: "65535",
},
{
name: "uint32",
input: uint32(4294967295),
expected: "4294967295",
},
{
name: "uint64 large",
input: uint64(18446744073709551615),
expected: "18446744073709551615",
},
{
name: "bool true",
input: true,
expected: "true",
},
{
name: "bool false",
input: false,
expected: "false",
},
{
name: "custom stringer",
input: customStringer{value: "custom string value"},
expected: "custom string value",
},
{
name: "custom int type",
input: customInt(12345),
expected: "12345",
},
{
name: "custom float type whole number",
input: customFloat(67890),
expected: "67890",
},
{
name: "custom float type decimal",
input: customFloat(123.456),
expected: "123.456",
},
{
name: "custom uint type",
input: customUint(99999),
expected: "99999",
},
{
name: "struct fallback",
input: struct{ Name string }{Name: "test"},
expected: "{test}",
},
{
name: "slice fallback",
input: []int{1, 2, 3},
expected: "[1 2 3]",
},
{
name: "map fallback",
input: map[string]int{"a": 1, "b": 2},
expected: fmt.Sprintf("%v", map[string]int{"a": 1, "b": 2}),
},
{
name: "float64 infinity",
input: math.Inf(1),
expected: "+Inf",
},
{
name: "float64 negative infinity",
input: math.Inf(-1),
expected: "-Inf",
},
{
name: "float64 NaN",
input: math.NaN(),
expected: "NaN",
},
{
name: "float64 very small positive",
input: float64(0.000000123),
expected: "0.000000123",
},
{
name: "float64 very small negative",
input: float64(-0.000000123),
expected: "-0.000000123",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := FormatValueForContains(tt.input)
assert.Equal(t, tt.expected, result)
})
}
}
func TestFormatValueForContains_LargeNumberScientificNotation(t *testing.T) {
largeNumber := float64(521509198310)
result := FormatValueForContains(largeNumber)
assert.Equal(t, "521509198310", result)
assert.NotEqual(t, "5.2150919831e+11", result)
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -34,7 +35,8 @@ func valueForIndexFilter(op qbtypes.FilterOperator, key *telemetrytypes.Telemetr
}
return values
}
return value
// resource table expects string value
return fmt.Sprintf(`%%%v%%`, value)
}
func keyIndexFilter(key *telemetrytypes.TelemetryFieldKey) any {
@@ -53,6 +55,16 @@ func (b *defaultConditionBuilder) ConditionFor(
return "true", nil
}
switch op {
case qbtypes.FilterOperatorContains,
qbtypes.FilterOperatorNotContains,
qbtypes.FilterOperatorILike,
qbtypes.FilterOperatorNotILike,
qbtypes.FilterOperatorLike,
qbtypes.FilterOperatorNotLike:
value = querybuilder.FormatValueForContains(value)
}
column, err := b.fm.ColumnFor(ctx, key)
if err != nil {
return "", err

View File

@@ -5,6 +5,7 @@ import (
"testing"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/assert"
@@ -77,6 +78,18 @@ func TestConditionBuilder(t *testing.T) {
expected: "LOWER(simpleJSONExtractString(labels, 'k8s.namespace.name')) LIKE LOWER(?) AND labels LIKE ? AND LOWER(labels) LIKE LOWER(?)",
expectedArgs: []any{"%banana%", "%k8s.namespace.name%", `%k8s.namespace.name%banana%`},
},
{
name: "Contains operator - string attribute number value",
key: &telemetrytypes.TelemetryFieldKey{
Name: "company.id",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
op: qbtypes.FilterOperatorContains,
value: 521509198310,
expected: "LOWER(simpleJSONExtractString(labels, 'company.id')) LIKE LOWER(?) AND labels LIKE ? AND LOWER(labels) LIKE LOWER(?)",
expectedArgs: []any{"%521509198310%", "%company.id%", `%company.id%521509198310%`},
},
{
name: "string_not_contains",
key: &telemetrytypes.TelemetryFieldKey{

View File

@@ -3,8 +3,10 @@ package resourcefilter
import (
"context"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -34,6 +36,7 @@ var signalConfigs = map[telemetrytypes.Signal]signalConfig{
// Generic resource filter statement builder
type resourceFilterStatementBuilder[T any] struct {
logger *slog.Logger
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
metadataStore telemetrytypes.MetadataStore
@@ -52,11 +55,14 @@ var (
// Constructor functions
func NewTraceResourceFilterStatementBuilder(
settings factory.ProviderSettings,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
metadataStore telemetrytypes.MetadataStore,
) *resourceFilterStatementBuilder[qbtypes.TraceAggregation] {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter")
return &resourceFilterStatementBuilder[qbtypes.TraceAggregation]{
logger: set.Logger(),
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
metadataStore: metadataStore,
@@ -65,6 +71,7 @@ func NewTraceResourceFilterStatementBuilder(
}
func NewLogResourceFilterStatementBuilder(
settings factory.ProviderSettings,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
metadataStore telemetrytypes.MetadataStore,
@@ -72,7 +79,9 @@ func NewLogResourceFilterStatementBuilder(
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *resourceFilterStatementBuilder[qbtypes.LogAggregation] {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter")
return &resourceFilterStatementBuilder[qbtypes.LogAggregation]{
logger: set.Logger(),
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
metadataStore: metadataStore,
@@ -148,6 +157,7 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
// warnings would be encountered as part of the main condition already
filterWhereClause, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Logger: b.logger,
FieldMapper: b.fieldMapper,
ConditionBuilder: b.conditionBuilder,
FieldKeys: keys,

View File

@@ -3,6 +3,7 @@ package querybuilder
import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
@@ -20,6 +21,7 @@ var searchTroubleshootingGuideURL = "https://signoz.io/docs/userguide/search-tro
// filterExpressionVisitor implements the FilterQueryVisitor interface
// to convert the parsed filter expressions into ClickHouse WHERE clause
type filterExpressionVisitor struct {
logger *slog.Logger
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
warnings []string
@@ -41,6 +43,7 @@ type filterExpressionVisitor struct {
}
type FilterExprVisitorOpts struct {
Logger *slog.Logger
FieldMapper qbtypes.FieldMapper
ConditionBuilder qbtypes.ConditionBuilder
FieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
@@ -58,6 +61,7 @@ type FilterExprVisitorOpts struct {
// newFilterExpressionVisitor creates a new filterExpressionVisitor
func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVisitor {
return &filterExpressionVisitor{
logger: opts.Logger,
fieldMapper: opts.FieldMapper,
conditionBuilder: opts.ConditionBuilder,
fieldKeys: opts.FieldKeys,
@@ -786,15 +790,35 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
}
if len(fieldKeysForName) > 1 && !v.keysWithWarnings[keyName] {
v.mainWarnURL = "https://signoz.io/docs/userguide/field-context-data-types/"
// this is warning state, we must have a unambiguous key
v.warnings = append(v.warnings, fmt.Sprintf(
"key `%s` is ambiguous, found %d different combinations of field context / data type: %v",
warnMsg := fmt.Sprintf(
"Key `%s` is ambiguous, found %d different combinations of field context / data type: %v.",
fieldKey.Name,
len(fieldKeysForName),
fieldKeysForName,
))
)
mixedFieldContext := map[string]bool{}
for _, item := range fieldKeysForName {
mixedFieldContext[item.FieldContext.StringValue()] = true
}
if mixedFieldContext[telemetrytypes.FieldContextResource.StringValue()] &&
mixedFieldContext[telemetrytypes.FieldContextAttribute.StringValue()] {
filteredKeys := []*telemetrytypes.TelemetryFieldKey{}
for _, item := range fieldKeysForName {
if item.FieldContext != telemetrytypes.FieldContextResource {
continue
}
filteredKeys = append(filteredKeys, item)
}
fieldKeysForName = filteredKeys
warnMsg += " " + "Using `resource` context by default. To query attributes explicitly, " +
fmt.Sprintf("use the fully qualified name (e.g., 'attribute.%s')", fieldKey.Name)
}
v.mainWarnURL = "https://signoz.io/docs/userguide/field-context-data-types/"
// this is warning state, we must have a unambiguous key
v.warnings = append(v.warnings, warnMsg)
v.keysWithWarnings[keyName] = true
v.logger.Warn("ambiguous key", "field_key_name", fieldKey.Name) //nolint:sloglint
}
return fieldKeysForName

View File

@@ -136,7 +136,14 @@ func NewSQLMigrationProviderFactories(
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
return factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory(), telemetrystorehook.NewLoggingFactory()),
clickhousetelemetrystore.NewFactory(
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return telemetrystorehook.NewSettingsFactory(s)
}),
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return telemetrystorehook.NewLoggingFactory()
}),
),
)
}

View File

@@ -7,6 +7,7 @@ import (
"strings"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"golang.org/x/exp/maps"
@@ -30,6 +31,16 @@ func (c *conditionBuilder) conditionFor(
sb *sqlbuilder.SelectBuilder,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorContains,
qbtypes.FilterOperatorNotContains,
qbtypes.FilterOperatorILike,
qbtypes.FilterOperatorNotILike,
qbtypes.FilterOperatorLike,
qbtypes.FilterOperatorNotLike:
value = querybuilder.FormatValueForContains(value)
}
column, err := c.fm.ColumnFor(ctx, key)
if err != nil {
return "", err

View File

@@ -111,6 +111,30 @@ func TestConditionFor(t *testing.T) {
expectedArgs: []any{"%admin%"},
expectedError: nil,
},
{
name: "Contains operator - string attribute number value",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorContains,
value: 521509198310,
expectedSQL: "LOWER(attributes_string['user.id']) LIKE LOWER(?)",
expectedArgs: []any{"%521509198310%", true},
expectedError: nil,
},
{
name: "Contains operator - body",
key: telemetrytypes.TelemetryFieldKey{
Name: "body",
},
operator: qbtypes.FilterOperatorContains,
value: 521509198310,
expectedSQL: "LOWER(body) LIKE LOWER(?)",
expectedArgs: []any{"%521509198310%"},
expectedError: nil,
},
{
name: "Contains operator - string attribute",
key: telemetrytypes.TelemetryFieldKey{

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -19,6 +20,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
keys := buildCompleteFieldKeyMap()
opts := querybuilder.FilterExprVisitorOpts{
Logger: instrumentationtest.New().Logger(),
FieldMapper: fm,
ConditionBuilder: cb,
FieldKeys: keys,

View File

@@ -6,8 +6,8 @@ import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/require"
)
@@ -21,14 +21,13 @@ func TestFilterExprLogs(t *testing.T) {
keys := buildCompleteFieldKeyMap()
opts := querybuilder.FilterExprVisitorOpts{
Logger: instrumentationtest.New().Logger(),
FieldMapper: fm,
ConditionBuilder: cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{
Name: "body",
},
JsonBodyPrefix: "body",
JsonKeyToKey: GetBodyJSONKey,
FullTextColumn: DefaultFullTextColumn,
JsonBodyPrefix: BodyJSONStringSearchPrefix,
JsonKeyToKey: GetBodyJSONKey,
}
testCases := []struct {
@@ -1406,6 +1405,14 @@ func TestFilterExprLogs(t *testing.T) {
expectedArgs: []any{"%error%", true},
expectedErrorContains: "",
},
{
category: "number contains body",
query: "body CONTAINS 521509198310",
shouldPass: true,
expectedQuery: "WHERE LOWER(body) LIKE LOWER(?)",
expectedArgs: []any{"%521509198310%"},
expectedErrorContains: "",
},
{
category: "CONTAINS operator",
query: "level CONTAINS \"critical\"",

View File

@@ -553,6 +553,7 @@ func (b *logQueryStatementBuilder) addFilterCondition(
if query.Filter != nil && query.Filter.Expression != "" {
// add filter expression
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,

View File

@@ -27,6 +27,7 @@ func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation
mockMetadataStore.KeysMap = keysMap
return resourcefilter.NewLogResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
fm,
cb,
mockMetadataStore,
@@ -119,7 +120,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -212,7 +213,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -321,7 +322,7 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()

View File

@@ -6,4 +6,6 @@ const (
LogsV2LocalTableName = "logs_v2"
TagAttributesV2TableName = "distributed_tag_attributes_v2"
TagAttributesV2LocalTableName = "tag_attributes_v2"
LogAttributeKeysTblName = "distributed_logs_attribute_keys"
LogResourceKeysTblName = "distributed_logs_resource_keys"
)

View File

@@ -27,6 +27,13 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"body": {
{
Name: "body",
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"http.status_code": {
{
Name: "http.status_code",

View File

@@ -5,6 +5,7 @@ import (
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -25,6 +26,17 @@ func (c *conditionBuilder) ConditionFor(
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorContains,
qbtypes.FilterOperatorNotContains,
qbtypes.FilterOperatorILike,
qbtypes.FilterOperatorNotILike,
qbtypes.FilterOperatorLike,
qbtypes.FilterOperatorNotLike:
value = querybuilder.FormatValueForContains(value)
}
column, err := c.fm.ColumnFor(ctx, key)
if err != nil {
// if we don't have a column, we can't build a condition for related values

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"os"
"slices"
"strings"
@@ -32,20 +31,23 @@ var (
)
type telemetryMetaStore struct {
logger *slog.Logger
telemetrystore telemetrystore.TelemetryStore
tracesDBName string
tracesFieldsTblName string
indexV3TblName string
metricsDBName string
metricsFieldsTblName string
meterDBName string
meterFieldsTblName string
logsDBName string
logsFieldsTblName string
logsV2TblName string
relatedMetadataDBName string
relatedMetadataTblName string
logger *slog.Logger
telemetrystore telemetrystore.TelemetryStore
tracesDBName string
tracesFieldsTblName string
spanAttributesKeysTblName string
indexV3TblName string
metricsDBName string
metricsFieldsTblName string
meterDBName string
meterFieldsTblName string
logsDBName string
logsFieldsTblName string
logAttributeKeysTblName string
logResourceKeysTblName string
logsV2TblName string
relatedMetadataDBName string
relatedMetadataTblName string
fm qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
@@ -60,6 +62,7 @@ func NewTelemetryMetaStore(
telemetrystore telemetrystore.TelemetryStore,
tracesDBName string,
tracesFieldsTblName string,
spanAttributesKeysTblName string,
indexV3TblName string,
metricsDBName string,
metricsFieldsTblName string,
@@ -68,26 +71,31 @@ func NewTelemetryMetaStore(
logsDBName string,
logsV2TblName string,
logsFieldsTblName string,
logAttributeKeysTblName string,
logResourceKeysTblName string,
relatedMetadataDBName string,
relatedMetadataTblName string,
) telemetrytypes.MetadataStore {
metadataSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata")
t := &telemetryMetaStore{
logger: metadataSettings.Logger(),
telemetrystore: telemetrystore,
tracesDBName: tracesDBName,
tracesFieldsTblName: tracesFieldsTblName,
indexV3TblName: indexV3TblName,
metricsDBName: metricsDBName,
metricsFieldsTblName: metricsFieldsTblName,
meterDBName: meterDBName,
meterFieldsTblName: meterFieldsTblName,
logsDBName: logsDBName,
logsV2TblName: logsV2TblName,
logsFieldsTblName: logsFieldsTblName,
relatedMetadataDBName: relatedMetadataDBName,
relatedMetadataTblName: relatedMetadataTblName,
logger: metadataSettings.Logger(),
telemetrystore: telemetrystore,
tracesDBName: tracesDBName,
tracesFieldsTblName: tracesFieldsTblName,
spanAttributesKeysTblName: spanAttributesKeysTblName,
indexV3TblName: indexV3TblName,
metricsDBName: metricsDBName,
metricsFieldsTblName: metricsFieldsTblName,
meterDBName: meterDBName,
meterFieldsTblName: meterFieldsTblName,
logsDBName: logsDBName,
logsV2TblName: logsV2TblName,
logsFieldsTblName: logsFieldsTblName,
logAttributeKeysTblName: logAttributeKeysTblName,
logResourceKeysTblName: logResourceKeysTblName,
relatedMetadataDBName: relatedMetadataDBName,
relatedMetadataTblName: relatedMetadataTblName,
}
fm := NewFieldMapper()
@@ -136,14 +144,18 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
}
sb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", `
CASE
WHEN tag_type = 'spanfield' THEN 1
WHEN tag_type = 'resource' THEN 2
WHEN tag_type = 'scope' THEN 3
WHEN tag_type = 'tag' THEN 4
ELSE 5
END as priority`).From(t.tracesDBName + "." + t.tracesFieldsTblName)
sb := sqlbuilder.Select(
"tagKey AS tag_key",
"tagType AS tag_type",
"dataType AS tag_data_type",
`CASE
// WHEN tagType = 'spanfield' THEN 1
WHEN tagType = 'resource' THEN 2
// WHEN tagType = 'scope' THEN 3
WHEN tagType = 'tag' THEN 4
ELSE 5
END as priority`,
).From(t.tracesDBName + "." + t.spanAttributesKeysTblName)
var limit int
searchTexts := []string{}
@@ -152,19 +164,20 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
conds := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
if fieldKeySelector.StartUnixMilli != 0 {
conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
}
if fieldKeySelector.EndUnixMilli != 0 {
conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
}
// TODO(srikanthccv): support time filtering for span attribute keys
// if fieldKeySelector.StartUnixMilli != 0 {
// conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
// }
// if fieldKeySelector.EndUnixMilli != 0 {
// conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
// }
// key part of the selector
fieldKeyConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_key", fieldKeySelector.Name))
fieldKeyConds = append(fieldKeyConds, sb.E("tagKey", fieldKeySelector.Name))
} else {
fieldKeyConds = append(fieldKeyConds, sb.ILike("tag_key", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
fieldKeyConds = append(fieldKeyConds, sb.ILike("tagKey", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
}
searchTexts = append(searchTexts, fieldKeySelector.Name)
@@ -172,29 +185,25 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
dataTypes = append(dataTypes, fieldKeySelector.FieldDataType)
}
// now look at the field context
// we don't write most of intrinsic fields to tag attributes table
// for this reason we don't want to apply tag_type if the field context
// if not attribute or resource attribute
// we don't write most of intrinsic fields to keys table
// for this reason we don't want to apply tagType if the field context
// is not attribute or resource attribute
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified &&
(fieldKeySelector.FieldContext == telemetrytypes.FieldContextAttribute ||
fieldKeySelector.FieldContext == telemetrytypes.FieldContextResource) {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_type", fieldKeySelector.FieldContext.TagType()))
fieldKeyConds = append(fieldKeyConds, sb.E("tagType", fieldKeySelector.FieldContext.TagType()))
}
// now look at the field data type
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_data_type", fieldKeySelector.FieldDataType.TagDataType()))
fieldKeyConds = append(fieldKeyConds, sb.E("dataType", fieldKeySelector.FieldDataType.TagDataType()))
}
conds = append(conds, sb.And(fieldKeyConds...))
limit += fieldKeySelector.Limit
if strings.TrimSpace(fieldKeySelector.Name) == "" {
sb.Limit(200)
}
}
sb.Where(sb.Or(conds...))
sb.GroupBy("tag_key", "tag_type", "tag_data_type")
sb.GroupBy("tagKey", "tagType", "dataType")
if limit == 0 {
limit = 1000
}
@@ -347,89 +356,145 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
}
tblName := t.logsFieldsTblName
if os.Getenv("LOGS_TAG_ATTRS_KEYS_TABLE") != "" {
tblName = os.Getenv("LOGS_TAG_ATTRS_KEYS_TABLE")
// queries for both attribute and resource keys tables
var queries []string
var allArgs []any
// tables to query based on field selectors
queryAttributeTable := false
queryResourceTable := false
for _, selector := range fieldKeySelectors {
if selector.FieldContext == telemetrytypes.FieldContextUnspecified {
// unspecified context, query both tables
queryAttributeTable = true
queryResourceTable = true
break
} else if selector.FieldContext == telemetrytypes.FieldContextAttribute {
queryAttributeTable = true
} else if selector.FieldContext == telemetrytypes.FieldContextResource {
queryResourceTable = true
}
}
sb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", `
CASE
WHEN tag_type = 'logfield' THEN 1
WHEN tag_type = 'resource' THEN 2
WHEN tag_type = 'scope' THEN 3
WHEN tag_type = 'tag' THEN 4
ELSE 5
END as priority`).From(t.logsDBName + "." + tblName)
var limit int
tablesToQuery := []struct {
fieldContext telemetrytypes.FieldContext
shouldQuery bool
}{
{telemetrytypes.FieldContextAttribute, queryAttributeTable},
{telemetrytypes.FieldContextResource, queryResourceTable},
}
conds := []string{}
searchTexts := []string{}
dataTypes := []telemetrytypes.FieldDataType{}
for _, fieldKeySelector := range fieldKeySelectors {
if fieldKeySelector.StartUnixMilli != 0 {
conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
}
if fieldKeySelector.EndUnixMilli != 0 {
conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
for _, table := range tablesToQuery {
if !table.shouldQuery {
continue
}
// key part of the selector
fieldKeyConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_key", fieldKeySelector.Name))
fieldContext := table.fieldContext
// table name based on field context
var tblName string
if fieldContext == telemetrytypes.FieldContextAttribute {
tblName = t.logsDBName + "." + t.logAttributeKeysTblName
} else {
fieldKeyConds = append(fieldKeyConds, sb.ILike("tag_key", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
}
searchTexts = append(searchTexts, fieldKeySelector.Name)
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
dataTypes = append(dataTypes, fieldKeySelector.FieldDataType)
tblName = t.logsDBName + "." + t.logResourceKeysTblName
}
// now look at the field context
// we don't write most of intrinsic fields to tag attributes table
// for this reason we don't want to apply tag_type if the field context
// if not attribute or resource attribute
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified &&
(fieldKeySelector.FieldContext == telemetrytypes.FieldContextAttribute ||
fieldKeySelector.FieldContext == telemetrytypes.FieldContextResource) {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_type", fieldKeySelector.FieldContext.TagType()))
sb := sqlbuilder.Select(
"name AS tag_key",
fmt.Sprintf("'%s' AS tag_type", fieldContext.TagType()),
"datatype AS tag_data_type",
fmt.Sprintf(`%d AS priority`, getPriorityForContext(fieldContext)),
).From(tblName)
var limit int
conds := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
// Include this selector if:
// 1. It has unspecified context (matches all tables)
// 2. Its context matches the current table's context
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified &&
fieldKeySelector.FieldContext != fieldContext {
continue
}
// key part of the selector
fieldKeyConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldKeyConds = append(fieldKeyConds, sb.E("name", fieldKeySelector.Name))
} else {
fieldKeyConds = append(fieldKeyConds, sb.ILike("name", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
}
// now look at the field data type
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("datatype", fieldKeySelector.FieldDataType.TagDataType()))
}
if len(fieldKeyConds) > 0 {
conds = append(conds, sb.And(fieldKeyConds...))
}
limit += fieldKeySelector.Limit
}
// now look at the field data type
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_data_type", fieldKeySelector.FieldDataType.TagDataType()))
if len(conds) > 0 {
sb.Where(sb.Or(conds...))
}
conds = append(conds, sb.And(fieldKeyConds...))
limit += fieldKeySelector.Limit
if strings.TrimSpace(fieldKeySelector.Name) == "" {
sb.Limit(200)
sb.GroupBy("name", "datatype")
if limit == 0 {
limit = 1000
}
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
queries = append(queries, query)
allArgs = append(allArgs, args...)
}
if len(queries) == 0 {
// No matching contexts, return empty result
return []*telemetrytypes.TelemetryFieldKey{}, true, nil
}
// Combine queries with UNION ALL
var limit int
for _, fieldKeySelector := range fieldKeySelectors {
limit += fieldKeySelector.Limit
}
sb.Where(sb.Or(conds...))
sb.GroupBy("tag_key", "tag_type", "tag_data_type")
if limit == 0 {
limit = 1000
}
mainSb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", "max(priority) as priority")
mainSb.From(mainSb.BuilderAs(sb, "sub_query"))
mainSb.GroupBy("tag_key", "tag_type", "tag_data_type")
mainSb.OrderBy("priority")
// query one extra to check if we hit the limit
mainSb.Limit(limit + 1)
mainQuery := fmt.Sprintf(`
SELECT tag_key, tag_type, tag_data_type, max(priority) as priority
FROM (
%s
) AS combined_results
GROUP BY tag_key, tag_type, tag_data_type
ORDER BY priority
LIMIT %d
`, strings.Join(queries, " UNION ALL "), limit+1)
query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, mainQuery, allArgs...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
rowCount := 0
searchTexts := []string{}
dataTypes := []telemetrytypes.FieldDataType{}
// Collect search texts and data types for static field matching
for _, fieldKeySelector := range fieldKeySelectors {
searchTexts = append(searchTexts, fieldKeySelector.Name)
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
dataTypes = append(dataTypes, fieldKeySelector.FieldDataType)
}
}
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
@@ -510,6 +575,21 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
return keys, complete, nil
}
func getPriorityForContext(ctx telemetrytypes.FieldContext) int {
switch ctx {
case telemetrytypes.FieldContextLog:
return 1
case telemetrytypes.FieldContextResource:
return 2
case telemetrytypes.FieldContextScope:
return 3
case telemetrytypes.FieldContextAttribute:
return 4
default:
return 5
}
}
// getMetricsKeys returns the keys from the metrics that match the field selection criteria
func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
if len(fieldKeySelectors) == 0 {
@@ -856,6 +936,7 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
}
whereClause, err := querybuilder.PrepareWhereClause(fieldValueSelector.ExistingQuery, querybuilder.FilterExprVisitorOpts{
Logger: t.logger,
FieldMapper: t.fm,
ConditionBuilder: t.conditionBuilder,
FieldKeys: keys,

View File

@@ -40,6 +40,7 @@ func TestGetKeys(t *testing.T) {
mockTelemetryStore,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,
telemetrytraces.SpanAttributesKeysTblName,
telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName,
@@ -48,6 +49,8 @@ func TestGetKeys(t *testing.T) {
telemetrylogs.DBName,
telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName,
telemetrylogs.LogAttributeKeysTblName,
telemetrylogs.LogResourceKeysTblName,
DBName,
AttributesMetadataLocalTableName,
)

View File

@@ -141,6 +141,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
)
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
@@ -223,6 +224,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
@@ -286,6 +288,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
)
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,

View File

@@ -6,6 +6,7 @@ import (
"slices"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -28,6 +29,16 @@ func (c *conditionBuilder) conditionFor(
sb *sqlbuilder.SelectBuilder,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorContains,
qbtypes.FilterOperatorNotContains,
qbtypes.FilterOperatorILike,
qbtypes.FilterOperatorNotILike,
qbtypes.FilterOperatorLike,
qbtypes.FilterOperatorNotLike:
value = querybuilder.FormatValueForContains(value)
}
tblFieldName, err := c.fm.FieldFor(ctx, key)
if err != nil {
return "", err

View File

@@ -133,6 +133,19 @@ func TestConditionFor(t *testing.T) {
expectedSQL: "",
expectedError: qbtypes.ErrInValues,
},
{
name: "Contains operator - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorContains,
value: 521509198310,
expectedSQL: "LOWER(JSONExtractString(labels, 'user.id')) LIKE LOWER(?)",
expectedArgs: []any{"%521509198310%"},
expectedError: nil,
},
{
name: "Not In operator - metric_name",
key: telemetrytypes.TelemetryFieldKey{

View File

@@ -68,6 +68,9 @@ func GetKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation])
for idx := range keySelectors {
keySelectors[idx].Signal = telemetrytypes.SignalMetrics
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
keySelectors[idx].MetricContext = &telemetrytypes.MetricContext{
MetricName: query.Aggregations[0].MetricName,
}
}
return keySelectors
}
@@ -295,6 +298,7 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
if query.Filter != nil && query.Filter.Expression != "" {
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,

View File

@@ -16,22 +16,13 @@ type provider struct {
hooks []telemetrystore.TelemetryStoreHook
}
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
func NewFactory(hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) {
// we want to fail fast so we have hook registration errors before creating the telemetry store
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
for i, hookFactory := range hookFactories {
hook, err := hookFactory.New(ctx, providerSettings, config)
if err != nil {
return nil, err
}
hooks[i] = hook
}
return New(ctx, providerSettings, config, hooks...)
return New(ctx, providerSettings, config, hookFactories...)
})
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) (telemetrystore.TelemetryStore, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
@@ -47,6 +38,20 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
return nil, err
}
var version string
if err := chConn.QueryRow(ctx, "SELECT version()").Scan(&version); err != nil {
return nil, err
}
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
for i, hookFactory := range hookFactories {
hook, err := hookFactory(version).New(ctx, providerSettings, config)
if err != nil {
return nil, err
}
hooks[i] = hook
}
return &provider{
settings: settings,
clickHouseConn: chConn,

View File

@@ -46,6 +46,7 @@ type QuerySettings struct {
MaxBytesToRead int `mapstructure:"max_bytes_to_read"`
MaxResultRows int `mapstructure:"max_result_rows"`
IgnoreDataSkippingIndices string `mapstructure:"ignore_data_skipping_indices"`
SecondaryIndicesEnableBulkFiltering bool `mapstructure:"secondary_indices_enable_bulk_filtering"`
}
func NewConfigFactory() factory.ConfigFactory {

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/factory"
)
type TelemetryStore interface {
@@ -19,6 +20,8 @@ type TelemetryStoreHook interface {
AfterQuery(ctx context.Context, event *QueryEvent)
}
type TelemetryStoreHookFactoryFunc func(string) factory.ProviderFactory[TelemetryStoreHook, Config]
func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, event *QueryEvent) context.Context {
for _, hook := range hooks {
ctx = hook.BeforeQuery(ctx, event)

View File

@@ -3,6 +3,7 @@ package telemetrystorehook
import (
"context"
"encoding/json"
"strings"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/factory"
@@ -11,16 +12,20 @@ import (
)
type provider struct {
settings telemetrystore.QuerySettings
clickHouseVersion string
settings telemetrystore.QuerySettings
}
func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings)
func NewSettingsFactory(version string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("settings"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
return NewSettings(ctx, providerSettings, config, version)
})
}
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, version string) (telemetrystore.TelemetryStoreHook, error) {
return &provider{
settings: config.Clickhouse.QuerySettings,
clickHouseVersion: version,
settings: config.Clickhouse.QuerySettings,
}, nil
}
@@ -75,6 +80,13 @@ func (h *provider) BeforeQuery(ctx context.Context, _ *telemetrystore.QueryEvent
settings["result_overflow_mode"] = ctx.Value("result_overflow_mode")
}
// ClickHouse version check is added since this setting is not support on version below 25.5
if strings.HasPrefix(h.clickHouseVersion, "25") && !h.settings.SecondaryIndicesEnableBulkFiltering {
// TODO(srikanthccv): enable it when the "Cannot read all data" issue is fixed
// https://github.com/ClickHouse/ClickHouse/issues/82283
settings["secondary_indices_enable_bulk_filtering"] = false
}
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx
}

View File

@@ -10,6 +10,7 @@ import (
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -33,6 +34,17 @@ func (c *conditionBuilder) conditionFor(
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorContains,
qbtypes.FilterOperatorNotContains,
qbtypes.FilterOperatorILike,
qbtypes.FilterOperatorNotILike,
qbtypes.FilterOperatorLike,
qbtypes.FilterOperatorNotLike:
value = querybuilder.FormatValueForContains(value)
}
// first, locate the raw column type (so we can choose the right EXISTS logic)
column, err := c.fm.ColumnFor(ctx, key)
if err != nil {

View File

@@ -111,6 +111,32 @@ func TestConditionFor(t *testing.T) {
expectedArgs: []any{"%admin%", true},
expectedError: nil,
},
{
name: "Contains operator - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorContains,
value: 521509198310,
expectedSQL: "LOWER(attributes_string['user.id']) LIKE LOWER(?)",
expectedArgs: []any{"%521509198310%", true},
expectedError: nil,
},
{
name: "LIKE operator - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorLike,
value: 521509198310,
expectedSQL: "attributes_string['user.id'] LIKE ?",
expectedArgs: []any{"521509198310", true},
expectedError: nil,
},
{
name: "Between operator - timestamp",
key: telemetrytypes.TelemetryFieldKey{

View File

@@ -3,6 +3,7 @@ package telemetrytraces
import (
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -64,6 +65,7 @@ func TestSpanScopeFilterExpression(t *testing.T) {
}}
whereClause, err := querybuilder.PrepareWhereClause(tt.expression, querybuilder.FilterExprVisitorOpts{
Logger: instrumentationtest.New().Logger(),
FieldMapper: fm,
ConditionBuilder: cb,
FieldKeys: fieldKeys,
@@ -130,6 +132,7 @@ func TestSpanScopeWithResourceFilter(t *testing.T) {
}}
_, err := querybuilder.PrepareWhereClause(tt.expression, querybuilder.FilterExprVisitorOpts{
Logger: instrumentationtest.New().Logger(),
FieldMapper: fm,
ConditionBuilder: cb,
FieldKeys: fieldKeys,

View File

@@ -735,6 +735,7 @@ func (b *traceQueryStatementBuilder) addFilterCondition(
if query.Filter != nil && query.Filter.Expression != "" {
// add filter expression
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,

View File

@@ -21,6 +21,7 @@ func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.TraceAggregati
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
return resourcefilter.NewTraceResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
fm,
cb,
mockMetadataStore,
@@ -327,7 +328,7 @@ func TestStatementBuilder(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -495,7 +496,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -557,7 +558,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()

View File

@@ -8,4 +8,5 @@ const (
TagAttributesV2LocalTableName = "tag_attributes_v2"
TopLevelOperationsTableName = "distributed_top_level_operations"
TraceSummaryTableName = "distributed_trace_summary"
SpanAttributesKeysTblName = "distributed_span_attributes_keys"
)

View File

@@ -38,12 +38,13 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
resourceFilterFM := resourcefilter.NewFieldMapper()
resourceFilterCB := resourcefilter.NewConditionBuilder(resourceFilterFM)
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
resourceFilterFM,
resourceFilterCB,
mockMetadataStore,
)
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),

View File

@@ -3,8 +3,6 @@ package ruletypes
import (
"encoding/json"
"fmt"
"github.com/SigNoz/signoz/pkg/query-service/converter"
"math"
"net/url"
"sort"
"strings"
@@ -13,7 +11,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
)
// this file contains common structs and methods used by
@@ -106,294 +103,6 @@ const (
Last MatchType = "5"
)
type RuleThreshold interface {
Name() string
Target() float64
RecoveryTarget() float64
MatchType() MatchType
CompareOp() CompareOp
SelectedQuery() string
ShouldAlert(series v3.Series) (Sample, bool)
}
type BasicRuleThreshold struct {
name string
target *float64
targetUnit string
ruleUnit string
recoveryTarget *float64
matchType MatchType
compareOp CompareOp
selectedQuery string
}
func NewBasicRuleThreshold(name string, target *float64, recoveryTarget *float64, matchType MatchType, op CompareOp, selectedQuery string, targetUnit string, ruleUnit string) *BasicRuleThreshold {
return &BasicRuleThreshold{name: name, target: target, recoveryTarget: recoveryTarget, matchType: matchType, selectedQuery: selectedQuery, compareOp: op, targetUnit: targetUnit, ruleUnit: ruleUnit}
}
func (b BasicRuleThreshold) Name() string {
return b.name
}
func (b BasicRuleThreshold) Target() float64 {
unitConverter := converter.FromUnit(converter.Unit(b.targetUnit))
// convert the target value to the y-axis unit
value := unitConverter.Convert(converter.Value{
F: *b.target,
U: converter.Unit(b.targetUnit),
}, converter.Unit(b.ruleUnit))
return value.F
}
func (b BasicRuleThreshold) RecoveryTarget() float64 {
return *b.recoveryTarget
}
func (b BasicRuleThreshold) MatchType() MatchType {
return b.matchType
}
func (b BasicRuleThreshold) CompareOp() CompareOp {
return b.compareOp
}
func (b BasicRuleThreshold) SelectedQuery() string {
return b.selectedQuery
}
func removeGroupinSetPoints(series v3.Series) []v3.Point {
var result []v3.Point
for _, s := range series.Points {
if s.Timestamp >= 0 && !math.IsNaN(s.Value) && !math.IsInf(s.Value, 0) {
result = append(result, s)
}
}
return result
}
func (b BasicRuleThreshold) ShouldAlert(series v3.Series) (Sample, bool) {
var shouldAlert bool
var alertSmpl Sample
var lbls qslabels.Labels
for name, value := range series.Labels {
lbls = append(lbls, qslabels.Label{Name: name, Value: value})
}
lbls = append(lbls, qslabels.Label{Name: "threshold", Value: b.name})
series.Points = removeGroupinSetPoints(series)
// nothing to evaluate
if len(series.Points) == 0 {
return alertSmpl, false
}
switch b.MatchType() {
case AtleastOnce:
// If any sample matches the condition, the rule is firing.
if b.CompareOp() == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value > b.Target() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOp() == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value < b.Target() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOp() == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value == b.Target() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOp() == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value != b.Target() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOp() == ValueOutsideBounds {
for _, smpl := range series.Points {
if math.Abs(smpl.Value) >= b.Target() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
}
case AllTheTimes:
// If all samples match the condition, the rule is firing.
shouldAlert = true
alertSmpl = Sample{Point: Point{V: b.Target()}, Metric: lbls}
if b.CompareOp() == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value <= b.Target() {
shouldAlert = false
break
}
}
// use min value from the series
if shouldAlert {
var minValue float64 = math.Inf(1)
for _, smpl := range series.Points {
if smpl.Value < minValue {
minValue = smpl.Value
}
}
alertSmpl = Sample{Point: Point{V: minValue}, Metric: lbls}
}
} else if b.CompareOp() == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value >= b.Target() {
shouldAlert = false
break
}
}
if shouldAlert {
var maxValue float64 = math.Inf(-1)
for _, smpl := range series.Points {
if smpl.Value > maxValue {
maxValue = smpl.Value
}
}
alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lbls}
}
} else if b.CompareOp() == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value != b.Target() {
shouldAlert = false
break
}
}
} else if b.CompareOp() == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value == b.Target() {
shouldAlert = false
break
}
}
// use any non-inf or nan value from the series
if shouldAlert {
for _, smpl := range series.Points {
if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
break
}
}
}
} else if b.CompareOp() == ValueOutsideBounds {
for _, smpl := range series.Points {
if math.Abs(smpl.Value) < b.Target() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = false
break
}
}
}
case OnAverage:
// If the average of all samples matches the condition, the rule is firing.
var sum, count float64
for _, smpl := range series.Points {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
}
sum += smpl.Value
count++
}
avg := sum / count
alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls}
if b.CompareOp() == ValueIsAbove {
if avg > b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsBelow {
if avg < b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsEq {
if avg == b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsNotEq {
if avg != b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueOutsideBounds {
if math.Abs(avg) >= b.Target() {
shouldAlert = true
}
}
case InTotal:
// If the sum of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series.Points {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
}
sum += smpl.Value
}
alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls}
if b.CompareOp() == ValueIsAbove {
if sum > b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsBelow {
if sum < b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsEq {
if sum == b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsNotEq {
if sum != b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueOutsideBounds {
if math.Abs(sum) >= b.Target() {
shouldAlert = true
}
}
case Last:
// If the last sample matches the condition, the rule is firing.
shouldAlert = false
alertSmpl = Sample{Point: Point{V: series.Points[len(series.Points)-1].Value}, Metric: lbls}
if b.CompareOp() == ValueIsAbove {
if series.Points[len(series.Points)-1].Value > b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsBelow {
if series.Points[len(series.Points)-1].Value < b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsEq {
if series.Points[len(series.Points)-1].Value == b.Target() {
shouldAlert = true
}
} else if b.CompareOp() == ValueIsNotEq {
if series.Points[len(series.Points)-1].Value != b.Target() {
shouldAlert = true
}
}
}
return alertSmpl, shouldAlert
}
type RuleCondition struct {
CompositeQuery *v3.CompositeQuery `json:"compositeQuery,omitempty" yaml:"compositeQuery,omitempty"`
CompareOp CompareOp `yaml:"op,omitempty" json:"op,omitempty"`
@@ -407,7 +116,6 @@ type RuleCondition struct {
SelectedQuery string `json:"selectedQueryName,omitempty"`
RequireMinPoints bool `yaml:"requireMinPoints,omitempty" json:"requireMinPoints,omitempty"`
RequiredNumPoints int `yaml:"requiredNumPoints,omitempty" json:"requiredNumPoints,omitempty"`
Thresholds []RuleThreshold `yaml:"thresholds,omitempty" json:"thresholds,omitempty"`
}
func (rc *RuleCondition) GetSelectedQueryName() string {

View File

@@ -65,12 +65,8 @@ type PostableRule struct {
Version string `json:"version,omitempty"`
// legacy
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
OldYaml string `json:"yaml,omitempty"`
EvalType string `yaml:"evalType,omitempty" json:"evalType,omitempty"`
Evaluation Evaluation `yaml:"evaluation,omitempty" json:"evaluation,omitempty"`
StartsAt int64 `yaml:"startsAt,omitempty" json:"startsAt,omitempty"`
Schedule string `json:"schedule,omitempty"`
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
OldYaml string `json:"yaml,omitempty"`
}
func ParsePostableRule(content []byte) (*PostableRule, error) {
@@ -144,15 +140,6 @@ func ParseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*P
return nil, err
}
//added alerts v2 fields
rule.RuleCondition.Thresholds = append(rule.RuleCondition.Thresholds,
NewBasicRuleThreshold(rule.AlertName, rule.RuleCondition.Target, nil, rule.RuleCondition.MatchType, rule.RuleCondition.CompareOp, rule.RuleCondition.SelectedQuery, rule.RuleCondition.TargetUnit, rule.RuleCondition.CompositeQuery.Unit))
if rule.EvalType == "" || rule.EvalType == "rolling" {
rule.EvalType = "rolling"
rule.Evaluation = NewEvaluation(rule.EvalType, RollingWindow{EvalWindow: rule.EvalWindow, Frequency: rule.Frequency, RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
} else if rule.EvalType == "cumulative" {
rule.Evaluation = NewEvaluation(rule.EvalType, CumulativeWindow{EvalWindow: rule.EvalWindow, StartsAt: time.UnixMilli(rule.StartsAt), RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
}
return rule, nil
}

View File

@@ -1,67 +0,0 @@
package ruletypes
import (
"time"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
)
type Evaluation interface {
EvaluationTime(curr time.Time) (time.Time, time.Time)
}
type RollingWindow struct {
EvalWindow Duration `json:"evalWindow"`
Frequency Duration `json:"frequency"`
RequireMinPoints bool `json:"requireMinPoints"`
RequiredNumPoints int `json:"requiredNumPoints"`
SkipEvalForNewGroups []v3.AttributeKey `json:"skipEvalForNewGroups"`
}
func (rollingWindow *RollingWindow) EvaluationTime(curr time.Time) (time.Time, time.Time) {
return curr.Add(time.Duration(-rollingWindow.EvalWindow)), curr
}
type CumulativeWindow struct {
StartsAt time.Time `json:"startsAt"`
EvalWindow Duration `json:"evalWindow"`
RequireMinPoints bool `json:"requireMinPoints"`
RequiredNumPoints int `json:"requiredNumPoints"`
SkipEvalForNewGroups []v3.AttributeKey `json:"skipEvalForNewGroups"`
}
func (cumulativeWindow *CumulativeWindow) EvaluationTime(curr time.Time) (time.Time, time.Time) {
if curr.Before(cumulativeWindow.StartsAt) {
return curr, curr
}
dur := time.Duration(cumulativeWindow.EvalWindow)
if dur <= 0 {
return curr, curr
}
// Calculate the number of complete windows since StartsAt
elapsed := curr.Sub(cumulativeWindow.StartsAt)
windows := int64(elapsed / dur)
windowStart := cumulativeWindow.StartsAt.Add(time.Duration(windows) * dur)
return windowStart, curr
}
func NewEvaluation(evalType string, params interface{}) Evaluation {
switch evalType {
case "rolling":
p, ok := params.(RollingWindow)
if !ok {
return nil
}
return &p
case "cumulative":
p, ok := params.(CumulativeWindow)
if !ok {
return nil
}
return &p
default:
return nil
}
}

View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "astroid"
@@ -390,7 +390,7 @@ files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
markers = {main = "sys_platform == \"win32\"", dev = "sys_platform == \"win32\" or platform_system == \"Windows\""}
markers = {main = "sys_platform == \"win32\"", dev = "platform_system == \"Windows\" or sys_platform == \"win32\""}
[[package]]
name = "dill"
@@ -983,14 +983,14 @@ files = [
[[package]]
name = "urllib3"
version = "2.4.0"
version = "2.5.0"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "urllib3-2.4.0-py3-none-any.whl", hash = "sha256:4e16665048960a0900c702d4a66415956a584919c03361cac9f1df5c5dd7e813"},
{file = "urllib3-2.4.0.tar.gz", hash = "sha256:414bc6535b787febd7567804cc015fee39daab8ad86268f1310a9250697de466"},
{file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"},
{file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"},
]
[package.extras]