mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-08 18:59:56 +00:00
Compare commits
12 Commits
imp/remove
...
feat/alert
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96bbeb0f6e | ||
|
|
05a0ce64d3 | ||
|
|
500c5ecd8c | ||
|
|
2cd9f3b6a8 | ||
|
|
43eea4d2a0 | ||
|
|
a1bace9b14 | ||
|
|
473488a91a | ||
|
|
5ebbafcb30 | ||
|
|
735f9e8105 | ||
|
|
92caca2507 | ||
|
|
c15f91529c | ||
|
|
355863fed9 |
@@ -40,7 +40,7 @@ services:
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
schema-migrator-sync:
|
||||
image: signoz/signoz-schema-migrator:v0.129.0
|
||||
image: signoz/signoz-schema-migrator:v0.128.2
|
||||
container_name: schema-migrator-sync
|
||||
command:
|
||||
- sync
|
||||
@@ -53,7 +53,7 @@ services:
|
||||
condition: service_healthy
|
||||
restart: on-failure
|
||||
schema-migrator-async:
|
||||
image: signoz/signoz-schema-migrator:v0.129.0
|
||||
image: signoz/signoz-schema-migrator:v0.128.2
|
||||
container_name: schema-migrator-async
|
||||
command:
|
||||
- async
|
||||
|
||||
@@ -122,7 +122,6 @@ telemetrystore:
|
||||
max_bytes_to_read: 0
|
||||
max_result_rows: 0
|
||||
ignore_data_skipping_indices: ""
|
||||
secondary_indices_enable_bulk_filtering: false
|
||||
|
||||
##################### Prometheus #####################
|
||||
prometheus:
|
||||
|
||||
@@ -174,7 +174,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.92.1
|
||||
image: signoz/signoz:v0.91.0
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
ports:
|
||||
@@ -207,7 +207,7 @@ services:
|
||||
retries: 3
|
||||
otel-collector:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz-otel-collector:v0.129.0
|
||||
image: signoz/signoz-otel-collector:v0.128.2
|
||||
command:
|
||||
- --config=/etc/otel-collector-config.yaml
|
||||
- --manager-config=/etc/manager-config.yaml
|
||||
@@ -231,7 +231,7 @@ services:
|
||||
- signoz
|
||||
schema-migrator:
|
||||
!!merge <<: *common
|
||||
image: signoz/signoz-schema-migrator:v0.129.0
|
||||
image: signoz/signoz-schema-migrator:v0.128.2
|
||||
deploy:
|
||||
restart_policy:
|
||||
condition: on-failure
|
||||
|
||||
@@ -115,7 +115,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.92.1
|
||||
image: signoz/signoz:v0.91.0
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
ports:
|
||||
@@ -148,7 +148,7 @@ services:
|
||||
retries: 3
|
||||
otel-collector:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz-otel-collector:v0.129.0
|
||||
image: signoz/signoz-otel-collector:v0.128.2
|
||||
command:
|
||||
- --config=/etc/otel-collector-config.yaml
|
||||
- --manager-config=/etc/manager-config.yaml
|
||||
@@ -174,7 +174,7 @@ services:
|
||||
- signoz
|
||||
schema-migrator:
|
||||
!!merge <<: *common
|
||||
image: signoz/signoz-schema-migrator:v0.129.0
|
||||
image: signoz/signoz-schema-migrator:v0.128.2
|
||||
deploy:
|
||||
restart_policy:
|
||||
condition: on-failure
|
||||
|
||||
@@ -177,7 +177,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.92.1}
|
||||
image: signoz/signoz:${VERSION:-v0.91.0}
|
||||
container_name: signoz
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
@@ -211,7 +211,7 @@ services:
|
||||
# TODO: support otel-collector multiple replicas. Nginx/Traefik for loadbalancing?
|
||||
otel-collector:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.129.0}
|
||||
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.128.2}
|
||||
container_name: signoz-otel-collector
|
||||
command:
|
||||
- --config=/etc/otel-collector-config.yaml
|
||||
@@ -237,7 +237,7 @@ services:
|
||||
condition: service_healthy
|
||||
schema-migrator-sync:
|
||||
!!merge <<: *common
|
||||
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.129.0}
|
||||
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.128.2}
|
||||
container_name: schema-migrator-sync
|
||||
command:
|
||||
- sync
|
||||
@@ -248,7 +248,7 @@ services:
|
||||
condition: service_healthy
|
||||
schema-migrator-async:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.129.0}
|
||||
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.128.2}
|
||||
container_name: schema-migrator-async
|
||||
command:
|
||||
- async
|
||||
|
||||
@@ -110,7 +110,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.92.1}
|
||||
image: signoz/signoz:${VERSION:-v0.91.0}
|
||||
container_name: signoz
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
@@ -143,7 +143,7 @@ services:
|
||||
retries: 3
|
||||
otel-collector:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.129.0}
|
||||
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.128.2}
|
||||
container_name: signoz-otel-collector
|
||||
command:
|
||||
- --config=/etc/otel-collector-config.yaml
|
||||
@@ -165,7 +165,7 @@ services:
|
||||
condition: service_healthy
|
||||
schema-migrator-sync:
|
||||
!!merge <<: *common
|
||||
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.129.0}
|
||||
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.128.2}
|
||||
container_name: schema-migrator-sync
|
||||
command:
|
||||
- sync
|
||||
@@ -177,7 +177,7 @@ services:
|
||||
restart: on-failure
|
||||
schema-migrator-async:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.129.0}
|
||||
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-v0.128.2}
|
||||
container_name: schema-migrator-async
|
||||
command:
|
||||
- async
|
||||
|
||||
@@ -167,16 +167,9 @@ func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.
|
||||
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds(),
|
||||
)
|
||||
|
||||
start := ts.Add(-time.Duration(r.EvalWindow())).UnixMilli()
|
||||
end := ts.UnixMilli()
|
||||
|
||||
if r.EvalDelay() > 0 {
|
||||
start = start - int64(r.EvalDelay().Milliseconds())
|
||||
end = end - int64(r.EvalDelay().Milliseconds())
|
||||
}
|
||||
// round to minute otherwise we could potentially miss data
|
||||
start = start - (start % (60 * 1000))
|
||||
end = end - (end % (60 * 1000))
|
||||
st, en := r.Timestamps(ts)
|
||||
start := st.UnixMilli()
|
||||
end := en.UnixMilli()
|
||||
|
||||
compositeQuery := r.Condition().CompositeQuery
|
||||
|
||||
@@ -253,9 +246,11 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
|
||||
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
|
||||
|
||||
for _, series := range queryResult.AnomalyScores {
|
||||
smpl, shouldAlert := r.ShouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
for _, threshold := range r.Thresholds() {
|
||||
smpl, shouldAlert := threshold.ShouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
}
|
||||
}
|
||||
}
|
||||
return resultVector, nil
|
||||
@@ -296,9 +291,11 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
|
||||
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
|
||||
|
||||
for _, series := range queryResult.AnomalyScores {
|
||||
smpl, shouldAlert := r.ShouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
for _, threshold := range r.Thresholds() {
|
||||
smpl, shouldAlert := threshold.ShouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
}
|
||||
}
|
||||
}
|
||||
return resultVector, nil
|
||||
|
||||
@@ -41,7 +41,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create ch rule task for evalution
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if tr.IsScheduled() {
|
||||
task.SetSchedule(tr.GetSchedule())
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||
|
||||
// create promql rule
|
||||
@@ -63,7 +65,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create promql rule task for evalution
|
||||
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if pr.IsScheduled() {
|
||||
task.SetSchedule(pr.GetSchedule())
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
|
||||
// create anomaly rule
|
||||
ar, err := NewAnomalyRule(
|
||||
@@ -85,7 +89,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create anomaly rule task for evalution
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if ar.IsScheduled() {
|
||||
task.SetSchedule(ar.GetSchedule())
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||
}
|
||||
|
||||
@@ -8,15 +8,14 @@ import {
|
||||
export const getValueSuggestions = (
|
||||
props: QueryKeyValueRequestProps,
|
||||
): Promise<AxiosResponse<QueryKeyValueSuggestionsResponseProps>> => {
|
||||
const { signal, key, searchText, signalSource, metricName } = props;
|
||||
const { signal, key, searchText, signalSource } = props;
|
||||
|
||||
const encodedSignal = encodeURIComponent(signal);
|
||||
const encodedKey = encodeURIComponent(key);
|
||||
const encodedMetricName = encodeURIComponent(metricName || '');
|
||||
const encodedSearchText = encodeURIComponent(searchText);
|
||||
const encodedSource = encodeURIComponent(signalSource || '');
|
||||
|
||||
return axios.get(
|
||||
`/fields/values?signal=${encodedSignal}&name=${encodedKey}&searchText=${encodedSearchText}&metricName=${encodedMetricName}&source=${encodedSource}`,
|
||||
`/fields/values?signal=${encodedSignal}&name=${encodedKey}&searchText=${encodedSearchText}&source=${encodedSource}`,
|
||||
);
|
||||
};
|
||||
|
||||
@@ -383,7 +383,6 @@ function QuerySearch({
|
||||
searchText: sanitizedSearchText,
|
||||
signal: dataSource,
|
||||
signalSource: signalSource as 'meter' | '',
|
||||
metricName: debouncedMetricName ?? undefined,
|
||||
});
|
||||
|
||||
// Skip updates if component unmounted or key changed
|
||||
@@ -475,7 +474,6 @@ function QuerySearch({
|
||||
activeKey,
|
||||
dataSource,
|
||||
isLoadingSuggestions,
|
||||
debouncedMetricName,
|
||||
signalSource,
|
||||
toggleSuggestions,
|
||||
],
|
||||
|
||||
@@ -6,7 +6,6 @@ export const GlobalShortcuts = {
|
||||
NavigateToAlerts: 'a+shift',
|
||||
NavigateToExceptions: 'e+shift',
|
||||
NavigateToMessagingQueues: 'm+shift',
|
||||
ToggleSidebar: 'b+shift',
|
||||
};
|
||||
|
||||
export const GlobalShortcutsName = {
|
||||
@@ -17,7 +16,6 @@ export const GlobalShortcutsName = {
|
||||
NavigateToAlerts: 'shift+a',
|
||||
NavigateToExceptions: 'shift+e',
|
||||
NavigateToMessagingQueues: 'shift+m',
|
||||
ToggleSidebar: 'shift+b',
|
||||
};
|
||||
|
||||
export const GlobalShortcutsDescription = {
|
||||
@@ -28,5 +26,4 @@ export const GlobalShortcutsDescription = {
|
||||
NavigateToAlerts: 'Navigate to alerts page',
|
||||
NavigateToExceptions: 'Navigate to Exceptions page',
|
||||
NavigateToMessagingQueues: 'Navigate to Messaging Queues page',
|
||||
ToggleSidebar: 'Toggle sidebar visibility',
|
||||
};
|
||||
|
||||
@@ -1,176 +0,0 @@
|
||||
import { render, screen } from '@testing-library/react';
|
||||
import userEvent from '@testing-library/user-event';
|
||||
import logEvent from 'api/common/logEvent';
|
||||
import { GlobalShortcuts } from 'constants/shortcuts/globalShortcuts';
|
||||
import { USER_PREFERENCES } from 'constants/userPreferences';
|
||||
import {
|
||||
KeyboardHotkeysProvider,
|
||||
useKeyboardHotkeys,
|
||||
} from 'hooks/hotkeys/useKeyboardHotkeys';
|
||||
import { QueryClient, QueryClientProvider } from 'react-query';
|
||||
|
||||
// Mock dependencies
|
||||
jest.mock('api/common/logEvent', () => jest.fn());
|
||||
|
||||
// Mock the AppContext
|
||||
const mockUpdateUserPreferenceInContext = jest.fn();
|
||||
|
||||
const SHIFT_B_KEYBOARD_SHORTCUT = '{Shift>}b{/Shift}';
|
||||
|
||||
jest.mock('providers/App/App', () => ({
|
||||
useAppContext: jest.fn(() => ({
|
||||
userPreferences: [
|
||||
{
|
||||
name: USER_PREFERENCES.SIDENAV_PINNED,
|
||||
value: false,
|
||||
},
|
||||
],
|
||||
updateUserPreferenceInContext: mockUpdateUserPreferenceInContext,
|
||||
})),
|
||||
}));
|
||||
|
||||
function TestComponent({
|
||||
mockHandleShortcut,
|
||||
}: {
|
||||
mockHandleShortcut: () => void;
|
||||
}): JSX.Element {
|
||||
const { registerShortcut } = useKeyboardHotkeys();
|
||||
registerShortcut(GlobalShortcuts.ToggleSidebar, mockHandleShortcut);
|
||||
return <div data-testid="test">Test</div>;
|
||||
}
|
||||
|
||||
describe('Sidebar Toggle Shortcut', () => {
|
||||
let queryClient: QueryClient;
|
||||
|
||||
beforeEach(() => {
|
||||
queryClient = new QueryClient({
|
||||
defaultOptions: {
|
||||
queries: {
|
||||
retry: false,
|
||||
},
|
||||
mutations: {
|
||||
retry: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('Global Shortcuts Constants', () => {
|
||||
it('should have the correct shortcut key combination', () => {
|
||||
expect(GlobalShortcuts.ToggleSidebar).toBe('b+shift');
|
||||
});
|
||||
});
|
||||
|
||||
describe('Keyboard Shortcut Registration', () => {
|
||||
it('should register the sidebar toggle shortcut correctly', async () => {
|
||||
const user = userEvent.setup();
|
||||
const mockHandleShortcut = jest.fn();
|
||||
|
||||
render(
|
||||
<QueryClientProvider client={queryClient}>
|
||||
<KeyboardHotkeysProvider>
|
||||
<TestComponent mockHandleShortcut={mockHandleShortcut} />
|
||||
</KeyboardHotkeysProvider>
|
||||
</QueryClientProvider>,
|
||||
);
|
||||
|
||||
// Trigger the shortcut
|
||||
await user.keyboard(SHIFT_B_KEYBOARD_SHORTCUT);
|
||||
|
||||
expect(mockHandleShortcut).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not trigger shortcut in input fields', async () => {
|
||||
const user = userEvent.setup();
|
||||
const mockHandleShortcut = jest.fn();
|
||||
|
||||
function TestComponent(): JSX.Element {
|
||||
const { registerShortcut } = useKeyboardHotkeys();
|
||||
registerShortcut(GlobalShortcuts.ToggleSidebar, mockHandleShortcut);
|
||||
return (
|
||||
<div>
|
||||
<input data-testid="input-field" />
|
||||
<div data-testid="test">Test</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
render(
|
||||
<QueryClientProvider client={queryClient}>
|
||||
<KeyboardHotkeysProvider>
|
||||
<TestComponent />
|
||||
</KeyboardHotkeysProvider>
|
||||
</QueryClientProvider>,
|
||||
);
|
||||
|
||||
// Focus on input field
|
||||
const inputField = screen.getByTestId('input-field');
|
||||
await user.click(inputField);
|
||||
|
||||
// Try to trigger shortcut while focused on input
|
||||
await user.keyboard('{Shift>}b{/Shift}');
|
||||
|
||||
// Should not trigger the shortcut
|
||||
expect(mockHandleShortcut).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Sidebar Toggle Functionality', () => {
|
||||
it('should log the toggle event with correct parameters', async () => {
|
||||
const user = userEvent.setup();
|
||||
const mockHandleShortcut = jest.fn(() => {
|
||||
logEvent('Global Shortcut: Sidebar Toggle', {
|
||||
previousState: false,
|
||||
newState: true,
|
||||
});
|
||||
});
|
||||
|
||||
render(
|
||||
<QueryClientProvider client={queryClient}>
|
||||
<KeyboardHotkeysProvider>
|
||||
<TestComponent mockHandleShortcut={mockHandleShortcut} />
|
||||
</KeyboardHotkeysProvider>
|
||||
</QueryClientProvider>,
|
||||
);
|
||||
|
||||
await user.keyboard(SHIFT_B_KEYBOARD_SHORTCUT);
|
||||
|
||||
expect(logEvent).toHaveBeenCalledWith('Global Shortcut: Sidebar Toggle', {
|
||||
previousState: false,
|
||||
newState: true,
|
||||
});
|
||||
});
|
||||
|
||||
it('should update user preference in context', async () => {
|
||||
const user = userEvent.setup();
|
||||
const mockHandleShortcut = jest.fn(() => {
|
||||
const save = {
|
||||
name: USER_PREFERENCES.SIDENAV_PINNED,
|
||||
value: true,
|
||||
};
|
||||
mockUpdateUserPreferenceInContext(save);
|
||||
});
|
||||
|
||||
render(
|
||||
<QueryClientProvider client={queryClient}>
|
||||
<KeyboardHotkeysProvider>
|
||||
<TestComponent mockHandleShortcut={mockHandleShortcut} />
|
||||
</KeyboardHotkeysProvider>
|
||||
</QueryClientProvider>,
|
||||
);
|
||||
|
||||
await user.keyboard(SHIFT_B_KEYBOARD_SHORTCUT);
|
||||
|
||||
expect(mockUpdateUserPreferenceInContext).toHaveBeenCalledWith({
|
||||
name: USER_PREFERENCES.SIDENAV_PINNED,
|
||||
value: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -10,10 +10,8 @@ import setLocalStorageApi from 'api/browser/localstorage/set';
|
||||
import getChangelogByVersion from 'api/changelog/getChangelogByVersion';
|
||||
import logEvent from 'api/common/logEvent';
|
||||
import manageCreditCardApi from 'api/v1/portal/create';
|
||||
import updateUserPreference from 'api/v1/user/preferences/name/update';
|
||||
import getUserLatestVersion from 'api/v1/version/getLatestVersion';
|
||||
import getUserVersion from 'api/v1/version/getVersion';
|
||||
import { AxiosError } from 'axios';
|
||||
import cx from 'classnames';
|
||||
import ChangelogModal from 'components/ChangelogModal/ChangelogModal';
|
||||
import ChatSupportGateway from 'components/ChatSupportGateway/ChatSupportGateway';
|
||||
@@ -24,12 +22,10 @@ import { Events } from 'constants/events';
|
||||
import { FeatureKeys } from 'constants/features';
|
||||
import { LOCALSTORAGE } from 'constants/localStorage';
|
||||
import ROUTES from 'constants/routes';
|
||||
import { GlobalShortcuts } from 'constants/shortcuts/globalShortcuts';
|
||||
import { USER_PREFERENCES } from 'constants/userPreferences';
|
||||
import SideNav from 'container/SideNav';
|
||||
import TopNav from 'container/TopNav';
|
||||
import dayjs from 'dayjs';
|
||||
import { useKeyboardHotkeys } from 'hooks/hotkeys/useKeyboardHotkeys';
|
||||
import { useIsDarkMode } from 'hooks/useDarkMode';
|
||||
import { useGetTenantLicense } from 'hooks/useGetTenantLicense';
|
||||
import { useNotifications } from 'hooks/useNotifications';
|
||||
@@ -72,10 +68,8 @@ import {
|
||||
LicensePlatform,
|
||||
LicenseState,
|
||||
} from 'types/api/licensesV3/getActive';
|
||||
import { UserPreference } from 'types/api/preferences/preference';
|
||||
import AppReducer from 'types/reducer/app';
|
||||
import { USER_ROLES } from 'types/roles';
|
||||
import { showErrorNotification } from 'utils/error';
|
||||
import { eventEmitter } from 'utils/getEventEmitter';
|
||||
import {
|
||||
getFormattedDate,
|
||||
@@ -668,85 +662,10 @@ function AppLayout(props: AppLayoutProps): JSX.Element {
|
||||
</div>
|
||||
);
|
||||
|
||||
const sideNavPinnedPreference = userPreferences?.find(
|
||||
const sideNavPinned = userPreferences?.find(
|
||||
(preference) => preference.name === USER_PREFERENCES.SIDENAV_PINNED,
|
||||
)?.value as boolean;
|
||||
|
||||
// Add loading state to prevent layout shift during initial load
|
||||
const [isSidebarLoaded, setIsSidebarLoaded] = useState(false);
|
||||
|
||||
// Get sidebar state from localStorage as fallback until preferences are loaded
|
||||
const getSidebarStateFromLocalStorage = useCallback((): boolean => {
|
||||
try {
|
||||
const storedValue = getLocalStorageApi(USER_PREFERENCES.SIDENAV_PINNED);
|
||||
return storedValue === 'true';
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}, []);
|
||||
|
||||
// Set sidebar as loaded after user preferences are fetched
|
||||
useEffect(() => {
|
||||
if (userPreferences !== null) {
|
||||
setIsSidebarLoaded(true);
|
||||
}
|
||||
}, [userPreferences]);
|
||||
|
||||
// Use localStorage value as fallback until preferences are loaded
|
||||
const isSideNavPinned = isSidebarLoaded
|
||||
? sideNavPinnedPreference
|
||||
: getSidebarStateFromLocalStorage();
|
||||
|
||||
const { registerShortcut, deregisterShortcut } = useKeyboardHotkeys();
|
||||
const { updateUserPreferenceInContext } = useAppContext();
|
||||
|
||||
const { mutate: updateUserPreferenceMutation } = useMutation(
|
||||
updateUserPreference,
|
||||
{
|
||||
onError: (error) => {
|
||||
showErrorNotification(notifications, error as AxiosError);
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
const handleToggleSidebar = useCallback((): void => {
|
||||
const newState = !isSideNavPinned;
|
||||
|
||||
logEvent('Global Shortcut: Sidebar Toggle', {
|
||||
previousState: isSideNavPinned,
|
||||
newState,
|
||||
});
|
||||
|
||||
// Save to localStorage immediately for instant feedback
|
||||
setLocalStorageApi(USER_PREFERENCES.SIDENAV_PINNED, newState.toString());
|
||||
|
||||
// Update the context immediately
|
||||
const save = {
|
||||
name: USER_PREFERENCES.SIDENAV_PINNED,
|
||||
value: newState,
|
||||
};
|
||||
updateUserPreferenceInContext(save as UserPreference);
|
||||
|
||||
// Make the API call in the background
|
||||
updateUserPreferenceMutation({
|
||||
name: USER_PREFERENCES.SIDENAV_PINNED,
|
||||
value: newState,
|
||||
});
|
||||
}, [
|
||||
isSideNavPinned,
|
||||
updateUserPreferenceInContext,
|
||||
updateUserPreferenceMutation,
|
||||
]);
|
||||
|
||||
// Register the sidebar toggle shortcut
|
||||
useEffect(() => {
|
||||
registerShortcut(GlobalShortcuts.ToggleSidebar, handleToggleSidebar);
|
||||
|
||||
return (): void => {
|
||||
deregisterShortcut(GlobalShortcuts.ToggleSidebar);
|
||||
};
|
||||
}, [registerShortcut, deregisterShortcut, handleToggleSidebar]);
|
||||
|
||||
const SHOW_TRIAL_EXPIRY_BANNER =
|
||||
showTrialExpiryBanner && !showPaymentFailedWarning;
|
||||
const SHOW_WORKSPACE_RESTRICTED_BANNER = showWorkspaceRestricted;
|
||||
@@ -820,14 +739,14 @@ function AppLayout(props: AppLayoutProps): JSX.Element {
|
||||
className={cx(
|
||||
'app-layout',
|
||||
isDarkMode ? 'darkMode dark' : 'lightMode',
|
||||
isSideNavPinned ? 'side-nav-pinned' : '',
|
||||
sideNavPinned ? 'side-nav-pinned' : '',
|
||||
SHOW_WORKSPACE_RESTRICTED_BANNER ? 'isWorkspaceRestricted' : '',
|
||||
SHOW_TRIAL_EXPIRY_BANNER ? 'isTrialExpired' : '',
|
||||
SHOW_PAYMENT_FAILED_BANNER ? 'isPaymentFailed' : '',
|
||||
)}
|
||||
>
|
||||
{isToDisplayLayout && !renderFullScreen && (
|
||||
<SideNav isPinned={isSideNavPinned} />
|
||||
<SideNav isPinned={sideNavPinned} />
|
||||
)}
|
||||
<div
|
||||
className={cx('app-content', {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import './MySettings.styles.scss';
|
||||
|
||||
import { Radio, RadioChangeEvent, Switch, Tag } from 'antd';
|
||||
import setLocalStorageApi from 'api/browser/localstorage/set';
|
||||
import logEvent from 'api/common/logEvent';
|
||||
import updateUserPreference from 'api/v1/user/preferences/name/update';
|
||||
import { AxiosError } from 'axios';
|
||||
@@ -110,9 +109,6 @@ function MySettings(): JSX.Element {
|
||||
// Optimistically update the UI
|
||||
setSideNavPinned(checked);
|
||||
|
||||
// Save to localStorage immediately for instant feedback
|
||||
setLocalStorageApi(USER_PREFERENCES.SIDENAV_PINNED, checked.toString());
|
||||
|
||||
// Update the context immediately
|
||||
const save = {
|
||||
name: USER_PREFERENCES.SIDENAV_PINNED,
|
||||
@@ -134,8 +130,6 @@ function MySettings(): JSX.Element {
|
||||
name: USER_PREFERENCES.SIDENAV_PINNED,
|
||||
value: !checked,
|
||||
} as UserPreference);
|
||||
// Also revert localStorage
|
||||
setLocalStorageApi(USER_PREFERENCES.SIDENAV_PINNED, (!checked).toString());
|
||||
showErrorNotification(notifications, error as AxiosError);
|
||||
},
|
||||
},
|
||||
|
||||
@@ -20,7 +20,6 @@ function TimeSeriesViewContainer({
|
||||
dataSource = DataSource.TRACES,
|
||||
isFilterApplied,
|
||||
setWarning,
|
||||
setIsLoadingQueries,
|
||||
}: TimeSeriesViewProps): JSX.Element {
|
||||
const { stagedQuery, currentQuery, panelType } = useQueryBuilder();
|
||||
|
||||
@@ -84,14 +83,6 @@ function TimeSeriesViewContainer({
|
||||
[data, isValidToConvertToMs],
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || isFetching) {
|
||||
setIsLoadingQueries(true);
|
||||
} else {
|
||||
setIsLoadingQueries(false);
|
||||
}
|
||||
}, [isLoading, isFetching, setIsLoadingQueries]);
|
||||
|
||||
return (
|
||||
<TimeSeriesView
|
||||
isFilterApplied={isFilterApplied}
|
||||
@@ -110,7 +101,6 @@ interface TimeSeriesViewProps {
|
||||
dataSource?: DataSource;
|
||||
isFilterApplied: boolean;
|
||||
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
|
||||
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
|
||||
}
|
||||
|
||||
TimeSeriesViewContainer.defaultProps = {
|
||||
|
||||
@@ -49,14 +49,9 @@ import { getListColumns, transformDataWithDate } from './utils';
|
||||
interface ListViewProps {
|
||||
isFilterApplied: boolean;
|
||||
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
|
||||
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
|
||||
}
|
||||
|
||||
function ListView({
|
||||
isFilterApplied,
|
||||
setWarning,
|
||||
setIsLoadingQueries,
|
||||
}: ListViewProps): JSX.Element {
|
||||
function ListView({ isFilterApplied, setWarning }: ListViewProps): JSX.Element {
|
||||
const {
|
||||
stagedQuery,
|
||||
panelType: panelTypeFromQueryBuilder,
|
||||
@@ -167,14 +162,6 @@ function ListView({
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [data?.payload, data?.warning]);
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || isFetching) {
|
||||
setIsLoadingQueries(true);
|
||||
} else {
|
||||
setIsLoadingQueries(false);
|
||||
}
|
||||
}, [isLoading, isFetching, setIsLoadingQueries]);
|
||||
|
||||
const dataLength =
|
||||
data?.payload?.data?.newResult?.data?.result[0]?.list?.length;
|
||||
const totalCount = useMemo(() => dataLength || 0, [dataLength]);
|
||||
|
||||
@@ -16,10 +16,8 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
function TableView({
|
||||
setWarning,
|
||||
setIsLoadingQueries,
|
||||
}: {
|
||||
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
|
||||
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
|
||||
}): JSX.Element {
|
||||
const { stagedQuery, panelType } = useQueryBuilder();
|
||||
|
||||
@@ -28,7 +26,7 @@ function TableView({
|
||||
GlobalReducer
|
||||
>((state) => state.globalTime);
|
||||
|
||||
const { data, isLoading, isFetching, isError, error } = useGetQueryRange(
|
||||
const { data, isLoading, isError, error } = useGetQueryRange(
|
||||
{
|
||||
query: stagedQuery || initialQueriesMap.traces,
|
||||
graphType: panelType || PANEL_TYPES.TABLE,
|
||||
@@ -51,14 +49,6 @@ function TableView({
|
||||
},
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || isFetching) {
|
||||
setIsLoadingQueries(true);
|
||||
} else {
|
||||
setIsLoadingQueries(false);
|
||||
}
|
||||
}, [isLoading, isFetching, setIsLoadingQueries]);
|
||||
|
||||
const queryTableData = useMemo(
|
||||
() =>
|
||||
data?.payload?.data?.newResult?.data?.result ||
|
||||
|
||||
@@ -40,13 +40,11 @@ import { ActionsContainer, Container } from './styles';
|
||||
interface TracesViewProps {
|
||||
isFilterApplied: boolean;
|
||||
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
|
||||
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
|
||||
}
|
||||
|
||||
function TracesView({
|
||||
isFilterApplied,
|
||||
setWarning,
|
||||
setIsLoadingQueries,
|
||||
}: TracesViewProps): JSX.Element {
|
||||
const { stagedQuery, panelType } = useQueryBuilder();
|
||||
const [orderBy, setOrderBy] = useState<string>('timestamp:desc');
|
||||
@@ -119,14 +117,6 @@ function TracesView({
|
||||
[responseData],
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || isFetching) {
|
||||
setIsLoadingQueries(true);
|
||||
} else {
|
||||
setIsLoadingQueries(false);
|
||||
}
|
||||
}, [isLoading, isFetching, setIsLoadingQueries]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!isLoading && !isFetching && !isError && (tableData || []).length !== 0) {
|
||||
logEvent('Traces Explorer: Data present', {
|
||||
|
||||
@@ -10,7 +10,6 @@ export const useGetQueryKeyValueSuggestions = ({
|
||||
signal,
|
||||
searchText,
|
||||
signalSource,
|
||||
metricName,
|
||||
}: {
|
||||
key: string;
|
||||
signal: 'traces' | 'logs' | 'metrics';
|
||||
@@ -19,26 +18,17 @@ export const useGetQueryKeyValueSuggestions = ({
|
||||
options?: UseQueryOptions<
|
||||
SuccessResponse<QueryKeyValueSuggestionsResponseProps> | ErrorResponse
|
||||
>;
|
||||
metricName?: string;
|
||||
}): UseQueryResult<
|
||||
AxiosResponse<QueryKeyValueSuggestionsResponseProps>,
|
||||
AxiosError
|
||||
> =>
|
||||
useQuery<AxiosResponse<QueryKeyValueSuggestionsResponseProps>, AxiosError>({
|
||||
queryKey: [
|
||||
'queryKeyValueSuggestions',
|
||||
key,
|
||||
signal,
|
||||
searchText,
|
||||
signalSource,
|
||||
metricName,
|
||||
],
|
||||
queryKey: ['queryKeyValueSuggestions', key, signal, searchText, signalSource],
|
||||
queryFn: () =>
|
||||
getValueSuggestions({
|
||||
signal,
|
||||
key,
|
||||
searchText: searchText || '',
|
||||
signalSource: signalSource as 'meter' | '',
|
||||
metricName: metricName || '',
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -6,7 +6,7 @@ import cx from 'classnames';
|
||||
import { CardContainer } from 'container/GridCardLayout/styles';
|
||||
import { useIsDarkMode } from 'hooks/useDarkMode';
|
||||
import { ChevronDown, ChevronUp } from 'lucide-react';
|
||||
import { useCallback, useRef, useState } from 'react';
|
||||
import { useRef, useState } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { Widgets } from 'types/api/dashboard/getAll';
|
||||
|
||||
@@ -129,22 +129,23 @@ function MetricPage(): JSX.Element {
|
||||
},
|
||||
];
|
||||
|
||||
const renderedGraphCountRef = useRef(0);
|
||||
const [renderedGraphCount, setRenderedGraphCount] = useState(0);
|
||||
const hasLoggedRef = useRef(false);
|
||||
|
||||
const checkIfDataExists = useCallback((isDataAvailable: boolean): void => {
|
||||
const checkIfDataExists = (isDataAvailable: boolean): void => {
|
||||
if (isDataAvailable) {
|
||||
renderedGraphCountRef.current += 1;
|
||||
const newCount = renderedGraphCount + 1;
|
||||
setRenderedGraphCount(newCount);
|
||||
|
||||
// Only log when first graph has rendered and we haven't logged yet
|
||||
if (renderedGraphCountRef.current === 1 && !hasLoggedRef.current) {
|
||||
if (newCount === 1 && !hasLoggedRef.current) {
|
||||
logEvent('MQ Kafka: Metric view', {
|
||||
graphRendered: true,
|
||||
});
|
||||
hasLoggedRef.current = true;
|
||||
}
|
||||
}
|
||||
}, []);
|
||||
};
|
||||
|
||||
return (
|
||||
<div className="metric-page">
|
||||
|
||||
@@ -69,7 +69,6 @@ function TracesExplorer(): JSX.Element {
|
||||
|
||||
// Get panel type from URL
|
||||
const panelTypesFromUrl = useGetPanelTypesQueryParam(PANEL_TYPES.LIST);
|
||||
const [isLoadingQueries, setIsLoadingQueries] = useState<boolean>(false);
|
||||
|
||||
const [selectedView, setSelectedView] = useState<ExplorerViews>(() =>
|
||||
getExplorerViewFromUrl(searchParams, panelTypesFromUrl),
|
||||
@@ -324,7 +323,6 @@ function TracesExplorer(): JSX.Element {
|
||||
rightActions={
|
||||
<RightToolbarActions
|
||||
onStageRunQuery={(): void => handleRunQuery(true, true)}
|
||||
isLoadingQueries={isLoadingQueries}
|
||||
/>
|
||||
}
|
||||
/>
|
||||
@@ -346,21 +344,13 @@ function TracesExplorer(): JSX.Element {
|
||||
|
||||
{selectedView === ExplorerViews.LIST && (
|
||||
<div className="trace-explorer-list-view">
|
||||
<ListView
|
||||
isFilterApplied={isFilterApplied}
|
||||
setWarning={setWarning}
|
||||
setIsLoadingQueries={setIsLoadingQueries}
|
||||
/>
|
||||
<ListView isFilterApplied={isFilterApplied} setWarning={setWarning} />
|
||||
</div>
|
||||
)}
|
||||
|
||||
{selectedView === ExplorerViews.TRACE && (
|
||||
<div className="trace-explorer-traces-view">
|
||||
<TracesView
|
||||
isFilterApplied={isFilterApplied}
|
||||
setWarning={setWarning}
|
||||
setIsLoadingQueries={setIsLoadingQueries}
|
||||
/>
|
||||
<TracesView isFilterApplied={isFilterApplied} setWarning={setWarning} />
|
||||
</div>
|
||||
)}
|
||||
|
||||
@@ -370,17 +360,13 @@ function TracesExplorer(): JSX.Element {
|
||||
dataSource={DataSource.TRACES}
|
||||
isFilterApplied={isFilterApplied}
|
||||
setWarning={setWarning}
|
||||
setIsLoadingQueries={setIsLoadingQueries}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{selectedView === ExplorerViews.TABLE && (
|
||||
<div className="trace-explorer-table-view">
|
||||
<TableView
|
||||
setWarning={setWarning}
|
||||
setIsLoadingQueries={setIsLoadingQueries}
|
||||
/>
|
||||
<TableView setWarning={setWarning} />
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
@@ -46,7 +46,6 @@ export interface QueryKeyValueRequestProps {
|
||||
key: string;
|
||||
searchText: string;
|
||||
signalSource?: 'meter' | '';
|
||||
metricName?: string;
|
||||
}
|
||||
|
||||
export type SignalType = 'traces' | 'logs' | 'metrics';
|
||||
|
||||
1
go.mod
1
go.mod
@@ -53,6 +53,7 @@ require (
|
||||
github.com/spf13/cobra v1.9.1
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.12.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/teambition/rrule-go v1.8.2
|
||||
github.com/tidwall/gjson v1.18.0
|
||||
github.com/uptrace/bun v1.2.9
|
||||
github.com/uptrace/bun/dialect/pgdialect v1.2.9
|
||||
|
||||
2
go.sum
2
go.sum
@@ -918,6 +918,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
||||
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
|
||||
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
|
||||
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||
|
||||
@@ -31,7 +31,6 @@ func NewAPI(
|
||||
telemetryStore,
|
||||
telemetrytraces.DBName,
|
||||
telemetrytraces.TagAttributesV2TableName,
|
||||
telemetrytraces.SpanAttributesKeysTblName,
|
||||
telemetrytraces.SpanIndexV3TableName,
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
@@ -40,8 +39,6 @@ func NewAPI(
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
)
|
||||
|
||||
@@ -50,7 +50,6 @@ func newProvider(
|
||||
telemetryStore,
|
||||
telemetrytraces.DBName,
|
||||
telemetrytraces.TagAttributesV2TableName,
|
||||
telemetrytraces.SpanAttributesKeysTblName,
|
||||
telemetrytraces.SpanIndexV3TableName,
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
@@ -59,8 +58,6 @@ func newProvider(
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
)
|
||||
@@ -72,13 +69,12 @@ func newProvider(
|
||||
resourceFilterFieldMapper := resourcefilter.NewFieldMapper()
|
||||
resourceFilterConditionBuilder := resourcefilter.NewConditionBuilder(resourceFilterFieldMapper)
|
||||
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
|
||||
settings,
|
||||
resourceFilterFieldMapper,
|
||||
resourceFilterConditionBuilder,
|
||||
telemetryMetadataStore,
|
||||
)
|
||||
|
||||
traceAggExprRewriter := querybuilder.NewAggExprRewriter(settings, nil, traceFieldMapper, traceConditionBuilder, "", nil)
|
||||
traceAggExprRewriter := querybuilder.NewAggExprRewriter(nil, traceFieldMapper, traceConditionBuilder, "", nil)
|
||||
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
|
||||
settings,
|
||||
telemetryMetadataStore,
|
||||
@@ -93,7 +89,6 @@ func newProvider(
|
||||
logFieldMapper := telemetrylogs.NewFieldMapper()
|
||||
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
|
||||
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
|
||||
settings,
|
||||
resourceFilterFieldMapper,
|
||||
resourceFilterConditionBuilder,
|
||||
telemetryMetadataStore,
|
||||
@@ -102,7 +97,6 @@ func newProvider(
|
||||
telemetrylogs.GetBodyJSONKey,
|
||||
)
|
||||
logAggExprRewriter := querybuilder.NewAggExprRewriter(
|
||||
settings,
|
||||
telemetrylogs.DefaultFullTextColumn,
|
||||
logFieldMapper,
|
||||
logConditionBuilder,
|
||||
|
||||
@@ -385,7 +385,7 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
|
||||
return resourceSubQuery, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
@@ -428,7 +428,7 @@ func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model
|
||||
|
||||
query := fmt.Sprintf(
|
||||
`SELECT
|
||||
toFloat64(quantileExact(0.99)(duration_nano)) as p99,
|
||||
quantile(0.99)(duration_nano) as p99,
|
||||
avg(duration_nano) as avgDuration,
|
||||
count(*) as numCalls
|
||||
FROM %s.%s
|
||||
@@ -510,274 +510,6 @@ func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
}
|
||||
|
||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
// Build parallel arrays for arrayZip approach
|
||||
var ops []string
|
||||
var svcs []string
|
||||
serviceOperationsMap := make(map[string][]string)
|
||||
|
||||
for svc, opsList := range *topLevelOps {
|
||||
// Cap operations to 1500 per service (same as original logic)
|
||||
cappedOps := opsList[:int(math.Min(1500, float64(len(opsList))))]
|
||||
serviceOperationsMap[svc] = cappedOps
|
||||
|
||||
// Add to parallel arrays
|
||||
for _, op := range cappedOps {
|
||||
ops = append(ops, op)
|
||||
svcs = append(svcs, svc)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("Operation pairs count: %d\n", len(ops))
|
||||
|
||||
// Build resource subquery for all services, but only include our target services
|
||||
targetServices := make([]string, 0, len(*topLevelOps))
|
||||
for svc := range *topLevelOps {
|
||||
targetServices = append(targetServices, svc)
|
||||
}
|
||||
resourceSubQuery, err := r.buildResourceSubQueryForServices(queryParams.Tags, targetServices, *queryParams.Start, *queryParams.End)
|
||||
if err != nil {
|
||||
zap.L().Error("Error building resource subquery", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
|
||||
// Build the optimized single query using arrayZip for tuple creation
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
resource_string_service$$name AS serviceName,
|
||||
toFloat64(quantileExact(0.99)(duration_nano)) AS p99,
|
||||
avg(duration_nano) AS avgDuration,
|
||||
count(*) AS numCalls,
|
||||
countIf(statusCode = 2) AS numErrors
|
||||
FROM %s.%s
|
||||
WHERE (name, resource_string_service$$name) IN arrayZip(@ops, @svcs)
|
||||
AND timestamp >= @start
|
||||
AND timestamp <= @end
|
||||
AND ts_bucket_start >= @start_bucket
|
||||
AND ts_bucket_start <= @end_bucket
|
||||
AND (resource_fingerprint GLOBAL IN %s)
|
||||
GROUP BY serviceName
|
||||
ORDER BY numCalls DESC`,
|
||||
r.TraceDB, r.traceTableName, resourceSubQuery,
|
||||
)
|
||||
|
||||
args := []interface{}{
|
||||
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
|
||||
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
|
||||
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
|
||||
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
|
||||
// Important: wrap slices with clickhouse.Array for IN/array params
|
||||
clickhouse.Named("ops", ops),
|
||||
clickhouse.Named("svcs", svcs),
|
||||
}
|
||||
|
||||
fmt.Printf("Query: %s\n", query)
|
||||
|
||||
// Execute the single optimized query
|
||||
rows, err := r.db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
zap.L().Error("Error executing optimized services query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Process results
|
||||
serviceItems := []model.ServiceItem{}
|
||||
|
||||
for rows.Next() {
|
||||
var serviceItem model.ServiceItem
|
||||
err := rows.ScanStruct(&serviceItem)
|
||||
if err != nil {
|
||||
zap.L().Error("Error scanning service item", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip services with zero calls (match original behavior)
|
||||
if serviceItem.NumCalls == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Add data warning for this service
|
||||
if ops, exists := serviceOperationsMap[serviceItem.ServiceName]; exists {
|
||||
serviceItem.DataWarning = model.DataWarning{
|
||||
TopLevelOps: ops,
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate derived fields
|
||||
serviceItem.CallRate = float64(serviceItem.NumCalls) / float64(queryParams.Period)
|
||||
if serviceItem.NumCalls > 0 {
|
||||
serviceItem.ErrorRate = float64(serviceItem.NumErrors) * 100 / float64(serviceItem.NumCalls)
|
||||
}
|
||||
|
||||
serviceItems = append(serviceItems, serviceItem)
|
||||
}
|
||||
|
||||
if err = rows.Err(); err != nil {
|
||||
zap.L().Error("Error iterating over service results", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
|
||||
// Fetch results from the original GetServicesOG for comparison
|
||||
ogResults, ogErr := r.GetServicesOG(ctx, queryParams)
|
||||
if ogErr != nil {
|
||||
zap.L().Error("Error fetching OG service results", zap.Error(ogErr))
|
||||
} else {
|
||||
// Compare the optimized results with OG results
|
||||
ogMap := make(map[string]model.ServiceItem)
|
||||
for _, ogItem := range *ogResults {
|
||||
ogMap[ogItem.ServiceName] = ogItem
|
||||
}
|
||||
|
||||
for _, optItem := range serviceItems {
|
||||
if ogItem, exists := ogMap[optItem.ServiceName]; exists {
|
||||
// Compare key fields (NumCalls, NumErrors, etc.)
|
||||
if optItem.NumCalls != ogItem.NumCalls ||
|
||||
optItem.NumErrors != ogItem.NumErrors ||
|
||||
int64(optItem.Percentile99) != int64(ogItem.Percentile99) ||
|
||||
int64(optItem.AvgDuration) != int64(ogItem.AvgDuration) {
|
||||
fmt.Printf(
|
||||
"[Discrepancy] Service: %s | optNumCalls: %d, ogNumCalls: %d | optNumErrors: %d, ogNumErrors: %d | optP99: %.2f, ogP99: %.2f | optAvgDuration: %.2f, ogAvgDuration: %.2f\n",
|
||||
optItem.ServiceName,
|
||||
optItem.NumCalls, ogItem.NumCalls,
|
||||
optItem.NumErrors, ogItem.NumErrors,
|
||||
optItem.Percentile99, ogItem.Percentile99,
|
||||
optItem.AvgDuration, ogItem.AvgDuration,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
zap.L().Warn("Service present in optimized results but missing in OG results",
|
||||
zap.String("service", optItem.ServiceName))
|
||||
}
|
||||
}
|
||||
|
||||
// Check for services present in OG but missing in optimized
|
||||
optMap := make(map[string]struct{})
|
||||
for _, optItem := range serviceItems {
|
||||
optMap[optItem.ServiceName] = struct{}{}
|
||||
}
|
||||
for _, ogItem := range *ogResults {
|
||||
if _, exists := optMap[ogItem.ServiceName]; !exists {
|
||||
fmt.Printf("Service present in OG results but missing in optimized results: %s\n", ogItem.ServiceName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
// buildResourceSubQueryForServices builds a resource subquery that includes only specific services
|
||||
// This maintains service context while optimizing for multiple services in a single query
|
||||
func (r *ClickHouseReader) buildResourceSubQueryForServices(tags []model.TagQueryParam, targetServices []string, start, end time.Time) (string, error) {
|
||||
if len(targetServices) == 0 {
|
||||
return "", fmt.Errorf("no target services provided")
|
||||
}
|
||||
|
||||
if len(tags) == 0 {
|
||||
// For exact parity with per-service behavior, build via resource builder with only service filter
|
||||
filterSet := v3.FilterSet{}
|
||||
filterSet.Items = append(filterSet.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "service.name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeResource,
|
||||
},
|
||||
Operator: v3.FilterOperatorIn,
|
||||
Value: targetServices,
|
||||
})
|
||||
|
||||
resourceSubQuery, err := resource.BuildResourceSubQuery(
|
||||
r.TraceDB,
|
||||
r.traceResourceTableV3,
|
||||
start.Unix()-1800,
|
||||
end.Unix(),
|
||||
&filterSet,
|
||||
[]v3.AttributeKey{},
|
||||
v3.AttributeKey{},
|
||||
false)
|
||||
if err != nil {
|
||||
zap.L().Error("Error building resource subquery for services", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
return resourceSubQuery, nil
|
||||
}
|
||||
|
||||
// Convert tags to filter set
|
||||
filterSet := v3.FilterSet{}
|
||||
for _, tag := range tags {
|
||||
// Skip the collector id as we don't add it to traces
|
||||
if tag.Key == "signoz.collector.id" {
|
||||
continue
|
||||
}
|
||||
|
||||
var it v3.FilterItem
|
||||
it.Key = v3.AttributeKey{
|
||||
Key: tag.Key,
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeResource,
|
||||
}
|
||||
|
||||
switch tag.Operator {
|
||||
case model.NotInOperator:
|
||||
it.Operator = v3.FilterOperatorNotIn
|
||||
it.Value = tag.StringValues
|
||||
case model.InOperator:
|
||||
it.Operator = v3.FilterOperatorIn
|
||||
it.Value = tag.StringValues
|
||||
default:
|
||||
return "", fmt.Errorf("operator %s not supported", tag.Operator)
|
||||
}
|
||||
|
||||
filterSet.Items = append(filterSet.Items, it)
|
||||
}
|
||||
|
||||
// Add service filter to limit to our target services
|
||||
filterSet.Items = append(filterSet.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "service.name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeResource,
|
||||
},
|
||||
Operator: v3.FilterOperatorIn,
|
||||
Value: targetServices,
|
||||
})
|
||||
|
||||
// Build resource subquery with service-specific filtering
|
||||
resourceSubQuery, err := resource.BuildResourceSubQuery(
|
||||
r.TraceDB,
|
||||
r.traceResourceTableV3,
|
||||
start.Unix()-1800,
|
||||
end.Unix(),
|
||||
&filterSet,
|
||||
[]v3.AttributeKey{},
|
||||
v3.AttributeKey{},
|
||||
false)
|
||||
if err != nil {
|
||||
zap.L().Error("Error building resource subquery for services", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
return resourceSubQuery, nil
|
||||
}
|
||||
|
||||
// buildServiceInClause creates a properly quoted IN clause for service names
|
||||
func (r *ClickHouseReader) buildServiceInClause(services []string) string {
|
||||
var quotedServices []string
|
||||
for _, svc := range services {
|
||||
// Escape single quotes and wrap in quotes
|
||||
escapedSvc := strings.ReplaceAll(svc, "'", "\\'")
|
||||
quotedServices = append(quotedServices, fmt.Sprintf("'%s'", escapedSvc))
|
||||
}
|
||||
return strings.Join(quotedServices, ", ")
|
||||
}
|
||||
|
||||
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
|
||||
// status can only be two and if both are selected than they are equivalent to none selected
|
||||
if _, ok := excludeMap["status"]; ok {
|
||||
@@ -797,6 +529,7 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
||||
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
|
||||
tags := []model.TagQuery{}
|
||||
for _, tag := range queryParams {
|
||||
@@ -953,6 +686,7 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
|
||||
}
|
||||
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetEntryPointOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, error) {
|
||||
// Step 1: Get top operations for the given service
|
||||
topOps, err := r.GetTopOperations(ctx, queryParams)
|
||||
@@ -1021,9 +755,9 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
toFloat64(quantileExact(0.5)(durationNano)) as p50,
|
||||
toFloat64(quantileExact(0.95)(durationNano)) as p95,
|
||||
toFloat64(quantileExact(0.99)(durationNano)) as p99,
|
||||
quantile(0.5)(durationNano) as p50,
|
||||
quantile(0.95)(durationNano) as p95,
|
||||
quantile(0.99)(durationNano) as p99,
|
||||
COUNT(*) as numCalls,
|
||||
countIf(status_code=2) as errorCount,
|
||||
name
|
||||
@@ -1505,11 +1239,11 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
|
||||
SELECT
|
||||
src as parent,
|
||||
dest as child,
|
||||
toFloat64(result[1]) AS p50,
|
||||
toFloat64(result[2]) AS p75,
|
||||
toFloat64(result[3]) AS p90,
|
||||
toFloat64(result[4]) AS p95,
|
||||
toFloat64(result[5]) AS p99,
|
||||
result[1] AS p50,
|
||||
result[2] AS p75,
|
||||
result[3] AS p90,
|
||||
result[4] AS p95,
|
||||
result[5] AS p99,
|
||||
sum(total_count) as callCount,
|
||||
sum(total_count)/ @duration AS callRate,
|
||||
sum(error_count)/sum(total_count) * 100 as errorRate
|
||||
@@ -1541,6 +1275,7 @@ func getLocalTableName(tableName string) string {
|
||||
return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1]
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
// uuid is used as transaction id
|
||||
uuidWithHyphen := uuid.New()
|
||||
@@ -1681,6 +1416,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
}(ttlPayload)
|
||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
// uuid is used as transaction id
|
||||
uuidWithHyphen := uuid.New()
|
||||
@@ -2321,6 +2057,7 @@ func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.Li
|
||||
|
||||
return &getErrorResponses, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.CountErrorsParams) (uint64, *model.ApiError) {
|
||||
|
||||
var errorCount uint64
|
||||
@@ -2432,6 +2169,7 @@ func (r *ClickHouseReader) GetNextPrevErrorIDs(ctx context.Context, queryParams
|
||||
return &getNextPrevErrorIDsResponse, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {
|
||||
|
||||
var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse
|
||||
@@ -3092,6 +2830,7 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
||||
var query string
|
||||
var err error
|
||||
@@ -3166,6 +2905,7 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
|
||||
|
||||
return &attributeValues, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.UUID, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
|
||||
|
||||
unixMilli := common.PastDayRoundOff()
|
||||
@@ -3837,6 +3577,7 @@ func readRow(vars []interface{}, columnNames []string, countOfNumberCols int) ([
|
||||
}
|
||||
return groupBy, groupAttributes, groupAttributesArray, nil
|
||||
}
|
||||
|
||||
func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string, countOfNumberCols int) ([]*v3.Series, error) {
|
||||
// when groupBy is applied, each combination of cartesian product
|
||||
// of attribute values is a separate series. Each item in seriesToPoints
|
||||
@@ -4632,6 +4373,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
|
||||
|
||||
return timeline, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
|
||||
ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) {
|
||||
query := fmt.Sprintf(`SELECT
|
||||
@@ -5220,6 +4962,7 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context,
|
||||
}
|
||||
return timeSeries, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.UUID, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
@@ -5437,6 +5180,7 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
@@ -6016,6 +5760,7 @@ func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_e
|
||||
Series: &seriesList,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) {
|
||||
// Build dynamic key selections and JSON extracts
|
||||
var jsonExtracts []string
|
||||
@@ -6188,6 +5933,7 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
|
||||
}
|
||||
return hasLE, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
|
||||
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
|
||||
var missingMetrics []string
|
||||
|
||||
@@ -85,6 +85,11 @@ type BaseRule struct {
|
||||
TemporalityMap map[string]map[v3.Temporality]bool
|
||||
|
||||
sqlstore sqlstore.SQLStore
|
||||
|
||||
evaluation ruletypes.Evaluation
|
||||
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
}
|
||||
|
||||
type RuleOption func(*BaseRule)
|
||||
@@ -139,6 +144,8 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
|
||||
Active: map[uint64]*ruletypes.Alert{},
|
||||
reader: reader,
|
||||
TemporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||
evaluation: p.Evaluation,
|
||||
schedule: p.Schedule,
|
||||
}
|
||||
|
||||
if baseRule.evalWindow == 0 {
|
||||
@@ -210,6 +217,18 @@ func (r *BaseRule) TargetVal() float64 {
|
||||
return r.targetVal()
|
||||
}
|
||||
|
||||
func (r *BaseRule) Thresholds() []ruletypes.RuleThreshold {
|
||||
return r.ruleCondition.Thresholds
|
||||
}
|
||||
|
||||
func (r *BaseRule) IsScheduled() bool {
|
||||
return r.schedule != ""
|
||||
}
|
||||
|
||||
func (r *BaseRule) GetSchedule() (string, time.Time) {
|
||||
return r.schedule, r.scheduleStartsAt
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) hostFromSource() string {
|
||||
parsedUrl, err := url.Parse(r.source)
|
||||
if err != nil {
|
||||
@@ -241,8 +260,10 @@ func (r *BaseRule) Unit() string {
|
||||
}
|
||||
|
||||
func (r *BaseRule) Timestamps(ts time.Time) (time.Time, time.Time) {
|
||||
start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli()
|
||||
end := ts.UnixMilli()
|
||||
|
||||
st, en := r.evaluation.EvaluationTime(ts)
|
||||
start := st.UnixMilli()
|
||||
end := en.UnixMilli()
|
||||
|
||||
if r.evalDelay > 0 {
|
||||
start = start - int64(r.evalDelay.Milliseconds())
|
||||
|
||||
@@ -168,7 +168,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
|
||||
// create ch rule task for evalution
|
||||
task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if tr.IsScheduled() {
|
||||
task.SetSchedule(tr.GetSchedule())
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||
|
||||
// create promql rule
|
||||
@@ -190,7 +192,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
|
||||
// create promql rule task for evalution
|
||||
task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if pr.IsScheduled() {
|
||||
task.SetSchedule(pr.GetSchedule())
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||
}
|
||||
|
||||
@@ -125,8 +125,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
|
||||
|
||||
prevState := r.State()
|
||||
|
||||
start := ts.Add(-r.evalWindow)
|
||||
end := ts
|
||||
start, end := r.Timestamps(ts)
|
||||
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
|
||||
|
||||
valueFormatter := formatter.FromUnit(r.Unit())
|
||||
@@ -151,84 +150,86 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
|
||||
var alerts = make(map[uint64]*ruletypes.Alert, len(res))
|
||||
|
||||
for _, series := range res {
|
||||
l := make(map[string]string, len(series.Metric))
|
||||
for _, lbl := range series.Metric {
|
||||
l[lbl.Name] = lbl.Value
|
||||
}
|
||||
|
||||
if len(series.Floats) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
alertSmpl, shouldAlert := r.ShouldAlert(toCommonSeries(series))
|
||||
if !shouldAlert {
|
||||
continue
|
||||
}
|
||||
r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", series)
|
||||
|
||||
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
|
||||
|
||||
tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold)
|
||||
// Inject some convenience variables that are easier to remember for users
|
||||
// who are not used to Go's templating system.
|
||||
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
|
||||
|
||||
expand := func(text string) string {
|
||||
|
||||
tmpl := ruletypes.NewTemplateExpander(
|
||||
ctx,
|
||||
defs+text,
|
||||
"__alert_"+r.Name(),
|
||||
tmplData,
|
||||
times.Time(timestamp.FromTime(ts)),
|
||||
nil,
|
||||
)
|
||||
result, err := tmpl.Expand()
|
||||
if err != nil {
|
||||
result = fmt.Sprintf("<error expanding template: %s>", err)
|
||||
r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData)
|
||||
for _, ruleThreshold := range r.Thresholds() {
|
||||
l := make(map[string]string, len(series.Metric))
|
||||
for _, lbl := range series.Metric {
|
||||
l[lbl.Name] = lbl.Value
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel)
|
||||
resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels()
|
||||
if len(series.Floats) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for name, value := range r.labels.Map() {
|
||||
lb.Set(name, expand(value))
|
||||
}
|
||||
alertSmpl, shouldAlert := ruleThreshold.ShouldAlert(toCommonSeries(series))
|
||||
if !shouldAlert {
|
||||
continue
|
||||
}
|
||||
r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", series)
|
||||
|
||||
lb.Set(qslabels.AlertNameLabel, r.Name())
|
||||
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
|
||||
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
|
||||
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
|
||||
|
||||
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
|
||||
for name, value := range r.annotations.Map() {
|
||||
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
|
||||
}
|
||||
tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold)
|
||||
// Inject some convenience variables that are easier to remember for users
|
||||
// who are not used to Go's templating system.
|
||||
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
|
||||
|
||||
lbs := lb.Labels()
|
||||
h := lbs.Hash()
|
||||
resultFPs[h] = struct{}{}
|
||||
expand := func(text string) string {
|
||||
|
||||
if _, ok := alerts[h]; ok {
|
||||
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
|
||||
// We have already acquired the lock above hence using SetHealth and
|
||||
// SetLastError will deadlock.
|
||||
r.health = ruletypes.HealthBad
|
||||
r.lastError = err
|
||||
return nil, err
|
||||
}
|
||||
tmpl := ruletypes.NewTemplateExpander(
|
||||
ctx,
|
||||
defs+text,
|
||||
"__alert_"+r.Name(),
|
||||
tmplData,
|
||||
times.Time(timestamp.FromTime(ts)),
|
||||
nil,
|
||||
)
|
||||
result, err := tmpl.Expand()
|
||||
if err != nil {
|
||||
result = fmt.Sprintf("<error expanding template: %s>", err)
|
||||
r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
alerts[h] = &ruletypes.Alert{
|
||||
Labels: lbs,
|
||||
QueryResultLables: resultLabels,
|
||||
Annotations: annotations,
|
||||
ActiveAt: ts,
|
||||
State: model.StatePending,
|
||||
Value: alertSmpl.V,
|
||||
GeneratorURL: r.GeneratorURL(),
|
||||
Receivers: r.preferredChannels,
|
||||
lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel)
|
||||
resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels()
|
||||
|
||||
for name, value := range r.labels.Map() {
|
||||
lb.Set(name, expand(value))
|
||||
}
|
||||
|
||||
lb.Set(qslabels.AlertNameLabel, r.Name())
|
||||
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
|
||||
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
|
||||
|
||||
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
|
||||
for name, value := range r.annotations.Map() {
|
||||
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
|
||||
}
|
||||
|
||||
lbs := lb.Labels()
|
||||
h := lbs.Hash()
|
||||
resultFPs[h] = struct{}{}
|
||||
|
||||
if _, ok := alerts[h]; ok {
|
||||
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
|
||||
// We have already acquired the lock above hence using SetHealth and
|
||||
// SetLastError will deadlock.
|
||||
r.health = ruletypes.HealthBad
|
||||
r.lastError = err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
alerts[h] = &ruletypes.Alert{
|
||||
Labels: lbs,
|
||||
QueryResultLables: resultLabels,
|
||||
Annotations: annotations,
|
||||
ActiveAt: ts,
|
||||
State: model.StatePending,
|
||||
Value: alertSmpl.V,
|
||||
GeneratorURL: r.GeneratorURL(),
|
||||
Receivers: r.preferredChannels,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package rules
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/teambition/rrule-go"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -39,6 +40,8 @@ type PromRuleTask struct {
|
||||
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID valuer.UUID
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
}
|
||||
|
||||
// newPromRuleTask holds rules that have promql condition
|
||||
@@ -75,6 +78,10 @@ func (g *PromRuleTask) Key() string {
|
||||
return g.name + ";" + g.file
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) IsCronSchedule() bool {
|
||||
return g.schedule != ""
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) Type() TaskType { return TaskTypeProm }
|
||||
|
||||
// Rules returns the group's rules.
|
||||
@@ -91,38 +98,6 @@ func (g *PromRuleTask) Pause(b bool) {
|
||||
|
||||
func (g *PromRuleTask) Run(ctx context.Context) {
|
||||
defer close(g.terminated)
|
||||
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-g.done:
|
||||
return
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleGroup": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||
tick := time.NewTicker(g.frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
// defer cleanup
|
||||
defer func() {
|
||||
if !g.markStale {
|
||||
return
|
||||
@@ -139,22 +114,114 @@ func (g *PromRuleTask) Run(ctx context.Context) {
|
||||
}(time.Now())
|
||||
|
||||
}()
|
||||
if g.IsCronSchedule() {
|
||||
schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression
|
||||
if err != nil {
|
||||
zap.L().Error("failed to parse rrule expression", zap.String("rrule", g.schedule), zap.Error(err))
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
nextRun := schedule.After(now, false)
|
||||
|
||||
iter()
|
||||
|
||||
// let the group iterate and run
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Until(nextRun)):
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleRuleTask": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
if g.pause {
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
g.Eval(ctx, start) // using current time instead of evalTimestamp
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
iter()
|
||||
currentRun := nextRun
|
||||
|
||||
for {
|
||||
// Calculate the next run time
|
||||
nextRun = schedule.After(currentRun, false)
|
||||
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||
iter()
|
||||
default:
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-time.After(time.Until(nextRun)):
|
||||
// Check if we missed any scheduled runs
|
||||
now := time.Now()
|
||||
if now.After(nextRun.Add(time.Minute)) { // Allow 1 minute tolerance
|
||||
zap.L().Warn("missed scheduled run",
|
||||
zap.Time("scheduled", nextRun),
|
||||
zap.Time("actual", now))
|
||||
}
|
||||
|
||||
currentRun = nextRun
|
||||
iter()
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-g.done:
|
||||
return
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleGroup": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||
tick := time.NewTicker(g.frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
iter()
|
||||
|
||||
// let the group iterate and run
|
||||
for {
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||
iter()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -245,6 +312,13 @@ func (g *PromRuleTask) setLastEvaluation(ts time.Time) {
|
||||
g.lastEvaluation = ts
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) SetSchedule(schedule string, t time.Time) {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.schedule = schedule
|
||||
g.scheduleStartsAt = t
|
||||
}
|
||||
|
||||
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
|
||||
func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time {
|
||||
var (
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/teambition/rrule-go"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@@ -36,6 +37,10 @@ type RuleTask struct {
|
||||
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID valuer.UUID
|
||||
|
||||
// New field for rrule-based scheduling
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
}
|
||||
|
||||
const DefaultFrequency = 1 * time.Minute
|
||||
@@ -71,6 +76,10 @@ func (g *RuleTask) Key() string {
|
||||
return g.name + ";" + g.file
|
||||
}
|
||||
|
||||
func (g *RuleTask) IsCronSchedule() bool {
|
||||
return g.schedule != ""
|
||||
}
|
||||
|
||||
// Name returns the group name.
|
||||
func (g *RuleTask) Type() TaskType { return TaskTypeCh }
|
||||
|
||||
@@ -95,56 +104,119 @@ func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) con
|
||||
func (g *RuleTask) Run(ctx context.Context) {
|
||||
defer close(g.terminated)
|
||||
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-g.done:
|
||||
return
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleRuleTask": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
if g.pause {
|
||||
// todo(amol): remove in memory active alerts
|
||||
// and last series state
|
||||
if g.IsCronSchedule() {
|
||||
schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression
|
||||
if err != nil {
|
||||
zap.L().Error("failed to parse rrule expression", zap.String("rrule", g.schedule), zap.Error(err))
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
now := time.Now()
|
||||
nextRun := schedule.After(now, false)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||
tick := time.NewTicker(g.frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
iter()
|
||||
|
||||
// let the group iterate and run
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Until(nextRun)):
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleRuleTask": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
if g.pause {
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
g.Eval(ctx, start) // using current time instead of evalTimestamp
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
iter()
|
||||
currentRun := nextRun
|
||||
|
||||
for {
|
||||
// Calculate the next run time
|
||||
nextRun = schedule.After(currentRun, false)
|
||||
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||
iter()
|
||||
default:
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-time.After(time.Until(nextRun)):
|
||||
// Check if we missed any scheduled runs
|
||||
now := time.Now()
|
||||
if now.After(nextRun.Add(time.Minute)) { // Allow 1 minute tolerance
|
||||
zap.L().Warn("missed scheduled run",
|
||||
zap.Time("scheduled", nextRun),
|
||||
zap.Time("actual", now))
|
||||
}
|
||||
|
||||
currentRun = nextRun
|
||||
iter()
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-g.done:
|
||||
return
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleRuleTask": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
if g.pause {
|
||||
// todo(amol): remove in memory active alerts
|
||||
// and last series state
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||
tick := time.NewTicker(g.frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
iter()
|
||||
|
||||
// let the group iterate and run
|
||||
for {
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||
iter()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -298,6 +370,13 @@ func (g *RuleTask) CopyState(fromTask Task) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *RuleTask) SetSchedule(schedule string, t time.Time) {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.schedule = schedule
|
||||
g.scheduleStartsAt = t
|
||||
}
|
||||
|
||||
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
|
||||
func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
|
||||
@@ -379,3 +458,41 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
}(i, rule)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to convert ruletypes.Schedule/Recurrence to rrule.ROption
|
||||
func recurrenceToROption(s *ruletypes.Schedule) rrule.ROption {
|
||||
// Only basic mapping for daily/weekly/monthly, can be extended
|
||||
opt := rrule.ROption{
|
||||
Dtstart: s.Recurrence.StartTime,
|
||||
}
|
||||
switch s.Recurrence.RepeatType {
|
||||
case ruletypes.RepeatTypeDaily:
|
||||
opt.Freq = rrule.DAILY
|
||||
case ruletypes.RepeatTypeWeekly:
|
||||
opt.Freq = rrule.WEEKLY
|
||||
for _, day := range s.Recurrence.RepeatOn {
|
||||
switch day {
|
||||
case ruletypes.RepeatOnSunday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.SU)
|
||||
case ruletypes.RepeatOnMonday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.MO)
|
||||
case ruletypes.RepeatOnTuesday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.TU)
|
||||
case ruletypes.RepeatOnWednesday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.WE)
|
||||
case ruletypes.RepeatOnThursday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.TH)
|
||||
case ruletypes.RepeatOnFriday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.FR)
|
||||
case ruletypes.RepeatOnSaturday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.SA)
|
||||
}
|
||||
}
|
||||
case ruletypes.RepeatTypeMonthly:
|
||||
opt.Freq = rrule.MONTHLY
|
||||
}
|
||||
if s.Recurrence.EndTime != nil {
|
||||
opt.Until = *s.Recurrence.EndTime
|
||||
}
|
||||
return opt
|
||||
}
|
||||
|
||||
@@ -28,6 +28,8 @@ type Task interface {
|
||||
Rules() []Rule
|
||||
Stop()
|
||||
Pause(b bool)
|
||||
IsCronSchedule() bool
|
||||
SetSchedule(string, time.Time)
|
||||
}
|
||||
|
||||
// newTask returns an appropriate group for
|
||||
|
||||
@@ -479,9 +479,11 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
|
||||
}
|
||||
|
||||
for _, series := range queryResult.Series {
|
||||
smpl, shouldAlert := r.ShouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
for _, threshold := range r.Thresholds() {
|
||||
smpl, shouldAlert := threshold.ShouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -549,9 +551,11 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
|
||||
}
|
||||
|
||||
for _, series := range queryResult.Series {
|
||||
smpl, shouldAlert := r.ShouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
for _, threshold := range r.Thresholds() {
|
||||
smpl, shouldAlert := threshold.ShouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -592,6 +596,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
|
||||
}
|
||||
|
||||
value := valueFormatter.Format(smpl.V, r.Unit())
|
||||
//todo(aniket): handle different threshold
|
||||
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
|
||||
r.logger.DebugContext(ctx, "Alert template data for rule", "rule_name", r.Name(), "formatter", valueFormatter.Name(), "value", value, "threshold", threshold)
|
||||
|
||||
|
||||
@@ -870,6 +870,10 @@ func TestPrepareLinksToLogs(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -913,6 +917,10 @@ func TestPrepareLinksToLogsV5(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -963,6 +971,10 @@ func TestPrepareLinksToTracesV5(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -1013,6 +1025,10 @@ func TestPrepareLinksToTraces(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -1141,6 +1157,10 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeClickHouseSQL,
|
||||
@@ -1191,6 +1211,10 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeClickHouseSQL,
|
||||
@@ -1248,6 +1272,10 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -1351,6 +1379,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
postableRule.RuleCondition.Target = &c.target
|
||||
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
||||
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
||||
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
|
||||
postableRule.Annotations = map[string]string{
|
||||
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
@@ -1398,6 +1427,10 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -1491,6 +1524,10 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -1556,6 +1593,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
||||
postableRule.RuleCondition.Target = &c.target
|
||||
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
||||
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
||||
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
|
||||
postableRule.Annotations = map[string]string{
|
||||
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
@@ -1602,6 +1640,10 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -1679,6 +1721,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
||||
postableRule.RuleCondition.Target = &c.target
|
||||
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
||||
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
||||
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
|
||||
postableRule.Annotations = map[string]string{
|
||||
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
@@ -1726,6 +1769,10 @@ func TestThresholdRuleShiftBy(t *testing.T) {
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
@@ -1782,3 +1829,172 @@ func TestThresholdRuleShiftBy(t *testing.T) {
|
||||
|
||||
assert.Equal(t, int64(10), params.CompositeQuery.BuilderQueries["A"].ShiftBy)
|
||||
}
|
||||
|
||||
func TestMultipleThresholdRule(t *testing.T) {
|
||||
postableRule := ruletypes.PostableRule{
|
||||
AlertName: "Mulitple threshold test",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
Evaluation: &ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "signoz_calls_total",
|
||||
},
|
||||
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
Expression: "A",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
cols := make([]cmock.ColumnType, 0)
|
||||
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
||||
cols = append(cols, cmock.ColumnType{Name: "attr", Type: "String"})
|
||||
cols = append(cols, cmock.ColumnType{Name: "timestamp", Type: "String"})
|
||||
|
||||
cases := []struct {
|
||||
targetUnit string
|
||||
yAxisUnit string
|
||||
values [][]interface{}
|
||||
expectAlerts int
|
||||
compareOp string
|
||||
matchType string
|
||||
target float64
|
||||
secondTarget float64
|
||||
summaryAny []string
|
||||
}{
|
||||
{
|
||||
targetUnit: "s",
|
||||
yAxisUnit: "ns",
|
||||
values: [][]interface{}{
|
||||
{float64(572588400), "attr", time.Now()}, // 0.57 seconds
|
||||
{float64(572386400), "attr", time.Now().Add(1 * time.Second)}, // 0.57 seconds
|
||||
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 0.3 seconds
|
||||
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 0.3 seconds
|
||||
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 0.06 seconds
|
||||
},
|
||||
expectAlerts: 2,
|
||||
compareOp: "1", // Above
|
||||
matchType: "1", // Once
|
||||
target: 1, // 1 second
|
||||
secondTarget: .5,
|
||||
summaryAny: []string{
|
||||
"observed metric value is 573 ms",
|
||||
"observed metric value is 572 ms",
|
||||
},
|
||||
},
|
||||
{
|
||||
targetUnit: "ms",
|
||||
yAxisUnit: "ns",
|
||||
values: [][]interface{}{
|
||||
{float64(572588400), "attr", time.Now()}, // 572.58 ms
|
||||
{float64(572386400), "attr", time.Now().Add(1 * time.Second)}, // 572.38 ms
|
||||
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 300.94 ms
|
||||
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 299.31 ms
|
||||
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 66.64 ms
|
||||
},
|
||||
expectAlerts: 6,
|
||||
compareOp: "1", // Above
|
||||
matchType: "1", // Once
|
||||
target: 200, // 200 ms
|
||||
secondTarget: 500,
|
||||
summaryAny: []string{
|
||||
"observed metric value is 299 ms",
|
||||
"the observed metric value is 573 ms",
|
||||
"the observed metric value is 572 ms",
|
||||
"the observed metric value is 301 ms",
|
||||
},
|
||||
},
|
||||
{
|
||||
targetUnit: "decgbytes",
|
||||
yAxisUnit: "bytes",
|
||||
values: [][]interface{}{
|
||||
{float64(2863284053), "attr", time.Now()}, // 2.86 GB
|
||||
{float64(2863388842), "attr", time.Now().Add(1 * time.Second)}, // 2.86 GB
|
||||
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 0.3 GB
|
||||
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 0.3 GB
|
||||
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 66.64 MB
|
||||
},
|
||||
expectAlerts: 2,
|
||||
compareOp: "1", // Above
|
||||
matchType: "1", // Once
|
||||
target: 200, // 200 GB
|
||||
secondTarget: 2, // 2GB
|
||||
summaryAny: []string{
|
||||
"observed metric value is 2.7 GiB",
|
||||
"the observed metric value is 0.3 GB",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
logger := instrumentationtest.New().Logger()
|
||||
|
||||
for idx, c := range cases {
|
||||
rows := cmock.NewRows(cols, c.values)
|
||||
// We are testing the eval logic after the query is run
|
||||
// so we don't care about the query string here
|
||||
queryString := "SELECT any"
|
||||
telemetryStore.Mock().
|
||||
ExpectQuery(queryString).
|
||||
WillReturnRows(rows)
|
||||
postableRule.RuleCondition.CompareOp = ruletypes.CompareOp(c.compareOp)
|
||||
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
|
||||
postableRule.RuleCondition.Target = &c.target
|
||||
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
||||
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
||||
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold("first_threshold", &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit),
|
||||
ruletypes.NewBasicRuleThreshold("second_threshold", &c.secondTarget, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit),
|
||||
}
|
||||
postableRule.Annotations = map[string]string{
|
||||
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
readerCache, err := cachetest.New(cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
||||
require.NoError(t, err)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache)
|
||||
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
"signoz_calls_total": {
|
||||
v3.Delta: true,
|
||||
},
|
||||
}
|
||||
if err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
retVal, err := rule.Eval(context.Background(), time.Now())
|
||||
if err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
|
||||
if c.expectAlerts != 0 {
|
||||
foundCount := 0
|
||||
for _, item := range rule.Active {
|
||||
for _, summary := range c.summaryAny {
|
||||
if strings.Contains(item.Annotations.Get("summary"), summary) {
|
||||
foundCount++
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.Equal(t, c.expectAlerts, foundCount, "case %d", idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,10 @@ package querybuilder
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
chparser "github.com/AfterShip/clickhouse-sql-parser/parser"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
@@ -16,7 +14,6 @@ import (
|
||||
)
|
||||
|
||||
type aggExprRewriter struct {
|
||||
logger *slog.Logger
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey
|
||||
fieldMapper qbtypes.FieldMapper
|
||||
conditionBuilder qbtypes.ConditionBuilder
|
||||
@@ -27,17 +24,13 @@ type aggExprRewriter struct {
|
||||
var _ qbtypes.AggExprRewriter = (*aggExprRewriter)(nil)
|
||||
|
||||
func NewAggExprRewriter(
|
||||
settings factory.ProviderSettings,
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey,
|
||||
fieldMapper qbtypes.FieldMapper,
|
||||
conditionBuilder qbtypes.ConditionBuilder,
|
||||
jsonBodyPrefix string,
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
|
||||
) *aggExprRewriter {
|
||||
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/agg_rewrite")
|
||||
|
||||
return &aggExprRewriter{
|
||||
logger: set.Logger(),
|
||||
fullTextColumn: fullTextColumn,
|
||||
fieldMapper: fieldMapper,
|
||||
conditionBuilder: conditionBuilder,
|
||||
@@ -77,7 +70,7 @@ func (r *aggExprRewriter) Rewrite(
|
||||
return "", nil, errors.NewInternalf(errors.CodeInternal, "no SELECT items for %q", expr)
|
||||
}
|
||||
|
||||
visitor := newExprVisitor(r.logger, keys,
|
||||
visitor := newExprVisitor(keys,
|
||||
r.fullTextColumn,
|
||||
r.fieldMapper,
|
||||
r.conditionBuilder,
|
||||
@@ -124,7 +117,6 @@ func (r *aggExprRewriter) RewriteMulti(
|
||||
// exprVisitor walks FunctionExpr nodes and applies the mappers.
|
||||
type exprVisitor struct {
|
||||
chparser.DefaultASTVisitor
|
||||
logger *slog.Logger
|
||||
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey
|
||||
fieldMapper qbtypes.FieldMapper
|
||||
@@ -137,7 +129,6 @@ type exprVisitor struct {
|
||||
}
|
||||
|
||||
func newExprVisitor(
|
||||
logger *slog.Logger,
|
||||
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey,
|
||||
fieldMapper qbtypes.FieldMapper,
|
||||
@@ -146,7 +137,6 @@ func newExprVisitor(
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
|
||||
) *exprVisitor {
|
||||
return &exprVisitor{
|
||||
logger: logger,
|
||||
fieldKeys: fieldKeys,
|
||||
fullTextColumn: fullTextColumn,
|
||||
fieldMapper: fieldMapper,
|
||||
@@ -193,7 +183,6 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
|
||||
whereClause, err := PrepareWhereClause(
|
||||
origPred,
|
||||
FilterExprVisitorOpts{
|
||||
Logger: v.logger,
|
||||
FieldKeys: v.fieldKeys,
|
||||
FieldMapper: v.fieldMapper,
|
||||
ConditionBuilder: v.conditionBuilder,
|
||||
|
||||
@@ -2,11 +2,7 @@ package querybuilder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
@@ -120,58 +116,3 @@ func GroupByKeys(keys []qbtypes.GroupByKey) []string {
|
||||
}
|
||||
return k
|
||||
}
|
||||
|
||||
func FormatValueForContains(value any) string {
|
||||
if value == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
return v
|
||||
case []byte:
|
||||
return string(v)
|
||||
|
||||
case json.Number:
|
||||
return v.String()
|
||||
|
||||
case float64:
|
||||
if v == math.Trunc(v) && v >= -1e15 && v <= 1e15 {
|
||||
return fmt.Sprintf("%.0f", v)
|
||||
}
|
||||
return strconv.FormatFloat(v, 'f', -1, 64)
|
||||
|
||||
case float32:
|
||||
return strconv.FormatFloat(float64(v), 'f', -1, 32)
|
||||
|
||||
case int, int8, int16, int32, int64:
|
||||
return fmt.Sprintf("%d", v)
|
||||
|
||||
case uint, uint8, uint16, uint32, uint64:
|
||||
return fmt.Sprintf("%d", v)
|
||||
|
||||
case bool:
|
||||
return strconv.FormatBool(v)
|
||||
|
||||
case fmt.Stringer:
|
||||
return v.String()
|
||||
|
||||
default:
|
||||
// fallback - try to convert through reflection
|
||||
rv := reflect.ValueOf(value)
|
||||
switch rv.Kind() {
|
||||
case reflect.Float32, reflect.Float64:
|
||||
f := rv.Float()
|
||||
if f == math.Trunc(f) && f >= -1e15 && f <= 1e15 {
|
||||
return fmt.Sprintf("%.0f", f)
|
||||
}
|
||||
return strconv.FormatFloat(f, 'f', -1, 64)
|
||||
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
||||
return strconv.FormatInt(rv.Int(), 10)
|
||||
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
||||
return strconv.FormatUint(rv.Uint(), 10)
|
||||
default:
|
||||
return fmt.Sprintf("%v", value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,275 +0,0 @@
|
||||
package querybuilder
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type customStringer struct {
|
||||
value string
|
||||
}
|
||||
|
||||
func (c customStringer) String() string {
|
||||
return c.value
|
||||
}
|
||||
|
||||
type customInt int64
|
||||
type customFloat float64
|
||||
type customUint uint64
|
||||
|
||||
func TestFormatValueForContains(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input any
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "nil value",
|
||||
input: nil,
|
||||
expected: "",
|
||||
},
|
||||
{
|
||||
name: "string value",
|
||||
input: "hello world",
|
||||
expected: "hello world",
|
||||
},
|
||||
{
|
||||
name: "empty string",
|
||||
input: "",
|
||||
expected: "",
|
||||
},
|
||||
{
|
||||
name: "string with special characters",
|
||||
input: "test@#$%^&*()_+-=",
|
||||
expected: "test@#$%^&*()_+-=",
|
||||
},
|
||||
{
|
||||
name: "byte slice",
|
||||
input: []byte("byte slice test"),
|
||||
expected: "byte slice test",
|
||||
},
|
||||
{
|
||||
name: "empty byte slice",
|
||||
input: []byte{},
|
||||
expected: "",
|
||||
},
|
||||
{
|
||||
name: "json.Number integer",
|
||||
input: json.Number("521509198310"),
|
||||
expected: "521509198310",
|
||||
},
|
||||
{
|
||||
name: "json.Number float",
|
||||
input: json.Number("3.14159"),
|
||||
expected: "3.14159",
|
||||
},
|
||||
{
|
||||
name: "json.Number scientific notation",
|
||||
input: json.Number("1.23e+10"),
|
||||
expected: "1.23e+10",
|
||||
},
|
||||
{
|
||||
name: "float64 whole number",
|
||||
input: float64(42),
|
||||
expected: "42",
|
||||
},
|
||||
{
|
||||
name: "float64 decimal",
|
||||
input: float64(3.14159),
|
||||
expected: "3.14159",
|
||||
},
|
||||
{
|
||||
name: "float64 large whole number",
|
||||
input: float64(521509198310),
|
||||
expected: "521509198310",
|
||||
},
|
||||
{
|
||||
name: "float64 at positive threshold",
|
||||
input: float64(1e15),
|
||||
expected: "1000000000000000",
|
||||
},
|
||||
{
|
||||
name: "float64 above positive threshold",
|
||||
input: float64(1e16),
|
||||
expected: "10000000000000000",
|
||||
},
|
||||
{
|
||||
name: "float64 at negative threshold",
|
||||
input: float64(-1e15),
|
||||
expected: "-1000000000000000",
|
||||
},
|
||||
{
|
||||
name: "float64 negative decimal",
|
||||
input: float64(-123.456),
|
||||
expected: "-123.456",
|
||||
},
|
||||
{
|
||||
name: "float64 zero",
|
||||
input: float64(0),
|
||||
expected: "0",
|
||||
},
|
||||
{
|
||||
name: "float32 whole number",
|
||||
input: float32(42),
|
||||
expected: "42",
|
||||
},
|
||||
{
|
||||
name: "float32 decimal",
|
||||
input: float32(3.14),
|
||||
expected: "3.14",
|
||||
},
|
||||
{
|
||||
name: "int",
|
||||
input: int(123),
|
||||
expected: "123",
|
||||
},
|
||||
{
|
||||
name: "int negative",
|
||||
input: int(-456),
|
||||
expected: "-456",
|
||||
},
|
||||
{
|
||||
name: "int8 max",
|
||||
input: int8(127),
|
||||
expected: "127",
|
||||
},
|
||||
{
|
||||
name: "int8 min",
|
||||
input: int8(-128),
|
||||
expected: "-128",
|
||||
},
|
||||
{
|
||||
name: "int16",
|
||||
input: int16(32767),
|
||||
expected: "32767",
|
||||
},
|
||||
{
|
||||
name: "int32",
|
||||
input: int32(2147483647),
|
||||
expected: "2147483647",
|
||||
},
|
||||
{
|
||||
name: "int64",
|
||||
input: int64(9223372036854775807),
|
||||
expected: "9223372036854775807",
|
||||
},
|
||||
{
|
||||
name: "uint",
|
||||
input: uint(123),
|
||||
expected: "123",
|
||||
},
|
||||
{
|
||||
name: "uint8 max",
|
||||
input: uint8(255),
|
||||
expected: "255",
|
||||
},
|
||||
{
|
||||
name: "uint16",
|
||||
input: uint16(65535),
|
||||
expected: "65535",
|
||||
},
|
||||
{
|
||||
name: "uint32",
|
||||
input: uint32(4294967295),
|
||||
expected: "4294967295",
|
||||
},
|
||||
{
|
||||
name: "uint64 large",
|
||||
input: uint64(18446744073709551615),
|
||||
expected: "18446744073709551615",
|
||||
},
|
||||
{
|
||||
name: "bool true",
|
||||
input: true,
|
||||
expected: "true",
|
||||
},
|
||||
{
|
||||
name: "bool false",
|
||||
input: false,
|
||||
expected: "false",
|
||||
},
|
||||
{
|
||||
name: "custom stringer",
|
||||
input: customStringer{value: "custom string value"},
|
||||
expected: "custom string value",
|
||||
},
|
||||
{
|
||||
name: "custom int type",
|
||||
input: customInt(12345),
|
||||
expected: "12345",
|
||||
},
|
||||
{
|
||||
name: "custom float type whole number",
|
||||
input: customFloat(67890),
|
||||
expected: "67890",
|
||||
},
|
||||
{
|
||||
name: "custom float type decimal",
|
||||
input: customFloat(123.456),
|
||||
expected: "123.456",
|
||||
},
|
||||
{
|
||||
name: "custom uint type",
|
||||
input: customUint(99999),
|
||||
expected: "99999",
|
||||
},
|
||||
{
|
||||
name: "struct fallback",
|
||||
input: struct{ Name string }{Name: "test"},
|
||||
expected: "{test}",
|
||||
},
|
||||
{
|
||||
name: "slice fallback",
|
||||
input: []int{1, 2, 3},
|
||||
expected: "[1 2 3]",
|
||||
},
|
||||
{
|
||||
name: "map fallback",
|
||||
input: map[string]int{"a": 1, "b": 2},
|
||||
expected: fmt.Sprintf("%v", map[string]int{"a": 1, "b": 2}),
|
||||
},
|
||||
{
|
||||
name: "float64 infinity",
|
||||
input: math.Inf(1),
|
||||
expected: "+Inf",
|
||||
},
|
||||
{
|
||||
name: "float64 negative infinity",
|
||||
input: math.Inf(-1),
|
||||
expected: "-Inf",
|
||||
},
|
||||
{
|
||||
name: "float64 NaN",
|
||||
input: math.NaN(),
|
||||
expected: "NaN",
|
||||
},
|
||||
{
|
||||
name: "float64 very small positive",
|
||||
input: float64(0.000000123),
|
||||
expected: "0.000000123",
|
||||
},
|
||||
{
|
||||
name: "float64 very small negative",
|
||||
input: float64(-0.000000123),
|
||||
expected: "-0.000000123",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := FormatValueForContains(tt.input)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFormatValueForContains_LargeNumberScientificNotation(t *testing.T) {
|
||||
largeNumber := float64(521509198310)
|
||||
result := FormatValueForContains(largeNumber)
|
||||
assert.Equal(t, "521509198310", result)
|
||||
assert.NotEqual(t, "5.2150919831e+11", result)
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -35,8 +34,7 @@ func valueForIndexFilter(op qbtypes.FilterOperator, key *telemetrytypes.Telemetr
|
||||
}
|
||||
return values
|
||||
}
|
||||
// resource table expects string value
|
||||
return fmt.Sprintf(`%%%v%%`, value)
|
||||
return value
|
||||
}
|
||||
|
||||
func keyIndexFilter(key *telemetrytypes.TelemetryFieldKey) any {
|
||||
@@ -55,16 +53,6 @@ func (b *defaultConditionBuilder) ConditionFor(
|
||||
return "true", nil
|
||||
}
|
||||
|
||||
switch op {
|
||||
case qbtypes.FilterOperatorContains,
|
||||
qbtypes.FilterOperatorNotContains,
|
||||
qbtypes.FilterOperatorILike,
|
||||
qbtypes.FilterOperatorNotILike,
|
||||
qbtypes.FilterOperatorLike,
|
||||
qbtypes.FilterOperatorNotLike:
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
column, err := b.fm.ColumnFor(ctx, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -78,18 +77,6 @@ func TestConditionBuilder(t *testing.T) {
|
||||
expected: "LOWER(simpleJSONExtractString(labels, 'k8s.namespace.name')) LIKE LOWER(?) AND labels LIKE ? AND LOWER(labels) LIKE LOWER(?)",
|
||||
expectedArgs: []any{"%banana%", "%k8s.namespace.name%", `%k8s.namespace.name%banana%`},
|
||||
},
|
||||
{
|
||||
name: "Contains operator - string attribute number value",
|
||||
key: &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "company.id",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
op: qbtypes.FilterOperatorContains,
|
||||
value: 521509198310,
|
||||
expected: "LOWER(simpleJSONExtractString(labels, 'company.id')) LIKE LOWER(?) AND labels LIKE ? AND LOWER(labels) LIKE LOWER(?)",
|
||||
expectedArgs: []any{"%521509198310%", "%company.id%", `%company.id%521509198310%`},
|
||||
},
|
||||
{
|
||||
name: "string_not_contains",
|
||||
key: &telemetrytypes.TelemetryFieldKey{
|
||||
|
||||
@@ -3,10 +3,8 @@ package resourcefilter
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -36,7 +34,6 @@ var signalConfigs = map[telemetrytypes.Signal]signalConfig{
|
||||
|
||||
// Generic resource filter statement builder
|
||||
type resourceFilterStatementBuilder[T any] struct {
|
||||
logger *slog.Logger
|
||||
fieldMapper qbtypes.FieldMapper
|
||||
conditionBuilder qbtypes.ConditionBuilder
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
@@ -55,14 +52,11 @@ var (
|
||||
|
||||
// Constructor functions
|
||||
func NewTraceResourceFilterStatementBuilder(
|
||||
settings factory.ProviderSettings,
|
||||
fieldMapper qbtypes.FieldMapper,
|
||||
conditionBuilder qbtypes.ConditionBuilder,
|
||||
metadataStore telemetrytypes.MetadataStore,
|
||||
) *resourceFilterStatementBuilder[qbtypes.TraceAggregation] {
|
||||
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter")
|
||||
return &resourceFilterStatementBuilder[qbtypes.TraceAggregation]{
|
||||
logger: set.Logger(),
|
||||
fieldMapper: fieldMapper,
|
||||
conditionBuilder: conditionBuilder,
|
||||
metadataStore: metadataStore,
|
||||
@@ -71,7 +65,6 @@ func NewTraceResourceFilterStatementBuilder(
|
||||
}
|
||||
|
||||
func NewLogResourceFilterStatementBuilder(
|
||||
settings factory.ProviderSettings,
|
||||
fieldMapper qbtypes.FieldMapper,
|
||||
conditionBuilder qbtypes.ConditionBuilder,
|
||||
metadataStore telemetrytypes.MetadataStore,
|
||||
@@ -79,9 +72,7 @@ func NewLogResourceFilterStatementBuilder(
|
||||
jsonBodyPrefix string,
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
|
||||
) *resourceFilterStatementBuilder[qbtypes.LogAggregation] {
|
||||
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter")
|
||||
return &resourceFilterStatementBuilder[qbtypes.LogAggregation]{
|
||||
logger: set.Logger(),
|
||||
fieldMapper: fieldMapper,
|
||||
conditionBuilder: conditionBuilder,
|
||||
metadataStore: metadataStore,
|
||||
@@ -157,7 +148,6 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
|
||||
|
||||
// warnings would be encountered as part of the main condition already
|
||||
filterWhereClause, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fieldMapper,
|
||||
ConditionBuilder: b.conditionBuilder,
|
||||
FieldKeys: keys,
|
||||
|
||||
@@ -3,7 +3,6 @@ package querybuilder
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@@ -21,7 +20,6 @@ var searchTroubleshootingGuideURL = "https://signoz.io/docs/userguide/search-tro
|
||||
// filterExpressionVisitor implements the FilterQueryVisitor interface
|
||||
// to convert the parsed filter expressions into ClickHouse WHERE clause
|
||||
type filterExpressionVisitor struct {
|
||||
logger *slog.Logger
|
||||
fieldMapper qbtypes.FieldMapper
|
||||
conditionBuilder qbtypes.ConditionBuilder
|
||||
warnings []string
|
||||
@@ -43,7 +41,6 @@ type filterExpressionVisitor struct {
|
||||
}
|
||||
|
||||
type FilterExprVisitorOpts struct {
|
||||
Logger *slog.Logger
|
||||
FieldMapper qbtypes.FieldMapper
|
||||
ConditionBuilder qbtypes.ConditionBuilder
|
||||
FieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
@@ -61,7 +58,6 @@ type FilterExprVisitorOpts struct {
|
||||
// newFilterExpressionVisitor creates a new filterExpressionVisitor
|
||||
func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVisitor {
|
||||
return &filterExpressionVisitor{
|
||||
logger: opts.Logger,
|
||||
fieldMapper: opts.FieldMapper,
|
||||
conditionBuilder: opts.ConditionBuilder,
|
||||
fieldKeys: opts.FieldKeys,
|
||||
@@ -790,35 +786,15 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
|
||||
}
|
||||
|
||||
if len(fieldKeysForName) > 1 && !v.keysWithWarnings[keyName] {
|
||||
warnMsg := fmt.Sprintf(
|
||||
"Key `%s` is ambiguous, found %d different combinations of field context / data type: %v.",
|
||||
v.mainWarnURL = "https://signoz.io/docs/userguide/field-context-data-types/"
|
||||
// this is warning state, we must have a unambiguous key
|
||||
v.warnings = append(v.warnings, fmt.Sprintf(
|
||||
"key `%s` is ambiguous, found %d different combinations of field context / data type: %v",
|
||||
fieldKey.Name,
|
||||
len(fieldKeysForName),
|
||||
fieldKeysForName,
|
||||
)
|
||||
mixedFieldContext := map[string]bool{}
|
||||
for _, item := range fieldKeysForName {
|
||||
mixedFieldContext[item.FieldContext.StringValue()] = true
|
||||
}
|
||||
|
||||
if mixedFieldContext[telemetrytypes.FieldContextResource.StringValue()] &&
|
||||
mixedFieldContext[telemetrytypes.FieldContextAttribute.StringValue()] {
|
||||
filteredKeys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
for _, item := range fieldKeysForName {
|
||||
if item.FieldContext != telemetrytypes.FieldContextResource {
|
||||
continue
|
||||
}
|
||||
filteredKeys = append(filteredKeys, item)
|
||||
}
|
||||
fieldKeysForName = filteredKeys
|
||||
warnMsg += " " + "Using `resource` context by default. To query attributes explicitly, " +
|
||||
fmt.Sprintf("use the fully qualified name (e.g., 'attribute.%s')", fieldKey.Name)
|
||||
}
|
||||
v.mainWarnURL = "https://signoz.io/docs/userguide/field-context-data-types/"
|
||||
// this is warning state, we must have a unambiguous key
|
||||
v.warnings = append(v.warnings, warnMsg)
|
||||
))
|
||||
v.keysWithWarnings[keyName] = true
|
||||
v.logger.Warn("ambiguous key", "field_key_name", fieldKey.Name) //nolint:sloglint
|
||||
}
|
||||
|
||||
return fieldKeysForName
|
||||
|
||||
@@ -136,14 +136,7 @@ func NewSQLMigrationProviderFactories(
|
||||
|
||||
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
clickhousetelemetrystore.NewFactory(
|
||||
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return telemetrystorehook.NewSettingsFactory(s)
|
||||
}),
|
||||
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return telemetrystorehook.NewLoggingFactory()
|
||||
}),
|
||||
),
|
||||
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory(), telemetrystorehook.NewLoggingFactory()),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"strings"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"golang.org/x/exp/maps"
|
||||
@@ -31,16 +30,6 @@ func (c *conditionBuilder) conditionFor(
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
) (string, error) {
|
||||
|
||||
switch operator {
|
||||
case qbtypes.FilterOperatorContains,
|
||||
qbtypes.FilterOperatorNotContains,
|
||||
qbtypes.FilterOperatorILike,
|
||||
qbtypes.FilterOperatorNotILike,
|
||||
qbtypes.FilterOperatorLike,
|
||||
qbtypes.FilterOperatorNotLike:
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
column, err := c.fm.ColumnFor(ctx, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
@@ -111,30 +111,6 @@ func TestConditionFor(t *testing.T) {
|
||||
expectedArgs: []any{"%admin%"},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Contains operator - string attribute number value",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorContains,
|
||||
value: 521509198310,
|
||||
expectedSQL: "LOWER(attributes_string['user.id']) LIKE LOWER(?)",
|
||||
expectedArgs: []any{"%521509198310%", true},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Contains operator - body",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body",
|
||||
},
|
||||
operator: qbtypes.FilterOperatorContains,
|
||||
value: 521509198310,
|
||||
expectedSQL: "LOWER(body) LIKE LOWER(?)",
|
||||
expectedArgs: []any{"%521509198310%"},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Contains operator - string attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -20,7 +19,6 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
|
||||
keys := buildCompleteFieldKeyMap()
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
|
||||
@@ -6,8 +6,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -21,13 +21,14 @@ func TestFilterExprLogs(t *testing.T) {
|
||||
keys := buildCompleteFieldKeyMap()
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body",
|
||||
},
|
||||
JsonBodyPrefix: "body",
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
@@ -1405,14 +1406,6 @@ func TestFilterExprLogs(t *testing.T) {
|
||||
expectedArgs: []any{"%error%", true},
|
||||
expectedErrorContains: "",
|
||||
},
|
||||
{
|
||||
category: "number contains body",
|
||||
query: "body CONTAINS 521509198310",
|
||||
shouldPass: true,
|
||||
expectedQuery: "WHERE LOWER(body) LIKE LOWER(?)",
|
||||
expectedArgs: []any{"%521509198310%"},
|
||||
expectedErrorContains: "",
|
||||
},
|
||||
{
|
||||
category: "CONTAINS operator",
|
||||
query: "level CONTAINS \"critical\"",
|
||||
|
||||
@@ -553,7 +553,6 @@ func (b *logQueryStatementBuilder) addFilterCondition(
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
// add filter expression
|
||||
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
|
||||
@@ -27,7 +27,6 @@ func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation
|
||||
mockMetadataStore.KeysMap = keysMap
|
||||
|
||||
return resourcefilter.NewLogResourceFilterStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
fm,
|
||||
cb,
|
||||
mockMetadataStore,
|
||||
@@ -120,7 +119,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
|
||||
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
@@ -213,7 +212,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
|
||||
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
@@ -322,7 +321,7 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
|
||||
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
|
||||
@@ -6,6 +6,4 @@ const (
|
||||
LogsV2LocalTableName = "logs_v2"
|
||||
TagAttributesV2TableName = "distributed_tag_attributes_v2"
|
||||
TagAttributesV2LocalTableName = "tag_attributes_v2"
|
||||
LogAttributeKeysTblName = "distributed_logs_attribute_keys"
|
||||
LogResourceKeysTblName = "distributed_logs_resource_keys"
|
||||
)
|
||||
|
||||
@@ -27,13 +27,6 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
"body": {
|
||||
{
|
||||
Name: "body",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
"http.status_code": {
|
||||
{
|
||||
Name: "http.status_code",
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -26,17 +25,6 @@ func (c *conditionBuilder) ConditionFor(
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
) (string, error) {
|
||||
|
||||
switch operator {
|
||||
case qbtypes.FilterOperatorContains,
|
||||
qbtypes.FilterOperatorNotContains,
|
||||
qbtypes.FilterOperatorILike,
|
||||
qbtypes.FilterOperatorNotILike,
|
||||
qbtypes.FilterOperatorLike,
|
||||
qbtypes.FilterOperatorNotLike:
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
column, err := c.fm.ColumnFor(ctx, key)
|
||||
if err != nil {
|
||||
// if we don't have a column, we can't build a condition for related values
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
@@ -31,23 +32,20 @@ var (
|
||||
)
|
||||
|
||||
type telemetryMetaStore struct {
|
||||
logger *slog.Logger
|
||||
telemetrystore telemetrystore.TelemetryStore
|
||||
tracesDBName string
|
||||
tracesFieldsTblName string
|
||||
spanAttributesKeysTblName string
|
||||
indexV3TblName string
|
||||
metricsDBName string
|
||||
metricsFieldsTblName string
|
||||
meterDBName string
|
||||
meterFieldsTblName string
|
||||
logsDBName string
|
||||
logsFieldsTblName string
|
||||
logAttributeKeysTblName string
|
||||
logResourceKeysTblName string
|
||||
logsV2TblName string
|
||||
relatedMetadataDBName string
|
||||
relatedMetadataTblName string
|
||||
logger *slog.Logger
|
||||
telemetrystore telemetrystore.TelemetryStore
|
||||
tracesDBName string
|
||||
tracesFieldsTblName string
|
||||
indexV3TblName string
|
||||
metricsDBName string
|
||||
metricsFieldsTblName string
|
||||
meterDBName string
|
||||
meterFieldsTblName string
|
||||
logsDBName string
|
||||
logsFieldsTblName string
|
||||
logsV2TblName string
|
||||
relatedMetadataDBName string
|
||||
relatedMetadataTblName string
|
||||
|
||||
fm qbtypes.FieldMapper
|
||||
conditionBuilder qbtypes.ConditionBuilder
|
||||
@@ -62,7 +60,6 @@ func NewTelemetryMetaStore(
|
||||
telemetrystore telemetrystore.TelemetryStore,
|
||||
tracesDBName string,
|
||||
tracesFieldsTblName string,
|
||||
spanAttributesKeysTblName string,
|
||||
indexV3TblName string,
|
||||
metricsDBName string,
|
||||
metricsFieldsTblName string,
|
||||
@@ -71,31 +68,26 @@ func NewTelemetryMetaStore(
|
||||
logsDBName string,
|
||||
logsV2TblName string,
|
||||
logsFieldsTblName string,
|
||||
logAttributeKeysTblName string,
|
||||
logResourceKeysTblName string,
|
||||
relatedMetadataDBName string,
|
||||
relatedMetadataTblName string,
|
||||
) telemetrytypes.MetadataStore {
|
||||
metadataSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata")
|
||||
|
||||
t := &telemetryMetaStore{
|
||||
logger: metadataSettings.Logger(),
|
||||
telemetrystore: telemetrystore,
|
||||
tracesDBName: tracesDBName,
|
||||
tracesFieldsTblName: tracesFieldsTblName,
|
||||
spanAttributesKeysTblName: spanAttributesKeysTblName,
|
||||
indexV3TblName: indexV3TblName,
|
||||
metricsDBName: metricsDBName,
|
||||
metricsFieldsTblName: metricsFieldsTblName,
|
||||
meterDBName: meterDBName,
|
||||
meterFieldsTblName: meterFieldsTblName,
|
||||
logsDBName: logsDBName,
|
||||
logsV2TblName: logsV2TblName,
|
||||
logsFieldsTblName: logsFieldsTblName,
|
||||
logAttributeKeysTblName: logAttributeKeysTblName,
|
||||
logResourceKeysTblName: logResourceKeysTblName,
|
||||
relatedMetadataDBName: relatedMetadataDBName,
|
||||
relatedMetadataTblName: relatedMetadataTblName,
|
||||
logger: metadataSettings.Logger(),
|
||||
telemetrystore: telemetrystore,
|
||||
tracesDBName: tracesDBName,
|
||||
tracesFieldsTblName: tracesFieldsTblName,
|
||||
indexV3TblName: indexV3TblName,
|
||||
metricsDBName: metricsDBName,
|
||||
metricsFieldsTblName: metricsFieldsTblName,
|
||||
meterDBName: meterDBName,
|
||||
meterFieldsTblName: meterFieldsTblName,
|
||||
logsDBName: logsDBName,
|
||||
logsV2TblName: logsV2TblName,
|
||||
logsFieldsTblName: logsFieldsTblName,
|
||||
relatedMetadataDBName: relatedMetadataDBName,
|
||||
relatedMetadataTblName: relatedMetadataTblName,
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
@@ -144,18 +136,14 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
|
||||
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
|
||||
}
|
||||
|
||||
sb := sqlbuilder.Select(
|
||||
"tagKey AS tag_key",
|
||||
"tagType AS tag_type",
|
||||
"dataType AS tag_data_type",
|
||||
`CASE
|
||||
// WHEN tagType = 'spanfield' THEN 1
|
||||
WHEN tagType = 'resource' THEN 2
|
||||
// WHEN tagType = 'scope' THEN 3
|
||||
WHEN tagType = 'tag' THEN 4
|
||||
ELSE 5
|
||||
END as priority`,
|
||||
).From(t.tracesDBName + "." + t.spanAttributesKeysTblName)
|
||||
sb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", `
|
||||
CASE
|
||||
WHEN tag_type = 'spanfield' THEN 1
|
||||
WHEN tag_type = 'resource' THEN 2
|
||||
WHEN tag_type = 'scope' THEN 3
|
||||
WHEN tag_type = 'tag' THEN 4
|
||||
ELSE 5
|
||||
END as priority`).From(t.tracesDBName + "." + t.tracesFieldsTblName)
|
||||
var limit int
|
||||
|
||||
searchTexts := []string{}
|
||||
@@ -164,20 +152,19 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
|
||||
conds := []string{}
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
|
||||
// TODO(srikanthccv): support time filtering for span attribute keys
|
||||
// if fieldKeySelector.StartUnixMilli != 0 {
|
||||
// conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
|
||||
// }
|
||||
// if fieldKeySelector.EndUnixMilli != 0 {
|
||||
// conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
|
||||
// }
|
||||
if fieldKeySelector.StartUnixMilli != 0 {
|
||||
conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
|
||||
}
|
||||
if fieldKeySelector.EndUnixMilli != 0 {
|
||||
conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
|
||||
}
|
||||
|
||||
// key part of the selector
|
||||
fieldKeyConds := []string{}
|
||||
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("tagKey", fieldKeySelector.Name))
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("tag_key", fieldKeySelector.Name))
|
||||
} else {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.ILike("tagKey", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
|
||||
fieldKeyConds = append(fieldKeyConds, sb.ILike("tag_key", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
|
||||
}
|
||||
|
||||
searchTexts = append(searchTexts, fieldKeySelector.Name)
|
||||
@@ -185,25 +172,29 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
|
||||
dataTypes = append(dataTypes, fieldKeySelector.FieldDataType)
|
||||
}
|
||||
// now look at the field context
|
||||
// we don't write most of intrinsic fields to keys table
|
||||
// for this reason we don't want to apply tagType if the field context
|
||||
// is not attribute or resource attribute
|
||||
// we don't write most of intrinsic fields to tag attributes table
|
||||
// for this reason we don't want to apply tag_type if the field context
|
||||
// if not attribute or resource attribute
|
||||
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified &&
|
||||
(fieldKeySelector.FieldContext == telemetrytypes.FieldContextAttribute ||
|
||||
fieldKeySelector.FieldContext == telemetrytypes.FieldContextResource) {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("tagType", fieldKeySelector.FieldContext.TagType()))
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("tag_type", fieldKeySelector.FieldContext.TagType()))
|
||||
}
|
||||
|
||||
// now look at the field data type
|
||||
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("dataType", fieldKeySelector.FieldDataType.TagDataType()))
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("tag_data_type", fieldKeySelector.FieldDataType.TagDataType()))
|
||||
}
|
||||
|
||||
conds = append(conds, sb.And(fieldKeyConds...))
|
||||
limit += fieldKeySelector.Limit
|
||||
if strings.TrimSpace(fieldKeySelector.Name) == "" {
|
||||
sb.Limit(200)
|
||||
}
|
||||
}
|
||||
sb.Where(sb.Or(conds...))
|
||||
sb.GroupBy("tagKey", "tagType", "dataType")
|
||||
sb.GroupBy("tag_key", "tag_type", "tag_data_type")
|
||||
|
||||
if limit == 0 {
|
||||
limit = 1000
|
||||
}
|
||||
@@ -356,145 +347,89 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
|
||||
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
|
||||
}
|
||||
|
||||
// queries for both attribute and resource keys tables
|
||||
var queries []string
|
||||
var allArgs []any
|
||||
|
||||
// tables to query based on field selectors
|
||||
queryAttributeTable := false
|
||||
queryResourceTable := false
|
||||
|
||||
for _, selector := range fieldKeySelectors {
|
||||
if selector.FieldContext == telemetrytypes.FieldContextUnspecified {
|
||||
// unspecified context, query both tables
|
||||
queryAttributeTable = true
|
||||
queryResourceTable = true
|
||||
break
|
||||
} else if selector.FieldContext == telemetrytypes.FieldContextAttribute {
|
||||
queryAttributeTable = true
|
||||
} else if selector.FieldContext == telemetrytypes.FieldContextResource {
|
||||
queryResourceTable = true
|
||||
}
|
||||
tblName := t.logsFieldsTblName
|
||||
if os.Getenv("LOGS_TAG_ATTRS_KEYS_TABLE") != "" {
|
||||
tblName = os.Getenv("LOGS_TAG_ATTRS_KEYS_TABLE")
|
||||
}
|
||||
|
||||
tablesToQuery := []struct {
|
||||
fieldContext telemetrytypes.FieldContext
|
||||
shouldQuery bool
|
||||
}{
|
||||
{telemetrytypes.FieldContextAttribute, queryAttributeTable},
|
||||
{telemetrytypes.FieldContextResource, queryResourceTable},
|
||||
}
|
||||
|
||||
for _, table := range tablesToQuery {
|
||||
if !table.shouldQuery {
|
||||
continue
|
||||
}
|
||||
|
||||
fieldContext := table.fieldContext
|
||||
|
||||
// table name based on field context
|
||||
var tblName string
|
||||
if fieldContext == telemetrytypes.FieldContextAttribute {
|
||||
tblName = t.logsDBName + "." + t.logAttributeKeysTblName
|
||||
} else {
|
||||
tblName = t.logsDBName + "." + t.logResourceKeysTblName
|
||||
}
|
||||
|
||||
sb := sqlbuilder.Select(
|
||||
"name AS tag_key",
|
||||
fmt.Sprintf("'%s' AS tag_type", fieldContext.TagType()),
|
||||
"datatype AS tag_data_type",
|
||||
fmt.Sprintf(`%d AS priority`, getPriorityForContext(fieldContext)),
|
||||
).From(tblName)
|
||||
|
||||
var limit int
|
||||
conds := []string{}
|
||||
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
// Include this selector if:
|
||||
// 1. It has unspecified context (matches all tables)
|
||||
// 2. Its context matches the current table's context
|
||||
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified &&
|
||||
fieldKeySelector.FieldContext != fieldContext {
|
||||
continue
|
||||
}
|
||||
|
||||
// key part of the selector
|
||||
fieldKeyConds := []string{}
|
||||
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("name", fieldKeySelector.Name))
|
||||
} else {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.ILike("name", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
|
||||
}
|
||||
|
||||
// now look at the field data type
|
||||
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("datatype", fieldKeySelector.FieldDataType.TagDataType()))
|
||||
}
|
||||
|
||||
if len(fieldKeyConds) > 0 {
|
||||
conds = append(conds, sb.And(fieldKeyConds...))
|
||||
}
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
|
||||
if len(conds) > 0 {
|
||||
sb.Where(sb.Or(conds...))
|
||||
}
|
||||
|
||||
sb.GroupBy("name", "datatype")
|
||||
if limit == 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
queries = append(queries, query)
|
||||
allArgs = append(allArgs, args...)
|
||||
}
|
||||
|
||||
if len(queries) == 0 {
|
||||
// No matching contexts, return empty result
|
||||
return []*telemetrytypes.TelemetryFieldKey{}, true, nil
|
||||
}
|
||||
|
||||
// Combine queries with UNION ALL
|
||||
sb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", `
|
||||
CASE
|
||||
WHEN tag_type = 'logfield' THEN 1
|
||||
WHEN tag_type = 'resource' THEN 2
|
||||
WHEN tag_type = 'scope' THEN 3
|
||||
WHEN tag_type = 'tag' THEN 4
|
||||
ELSE 5
|
||||
END as priority`).From(t.logsDBName + "." + tblName)
|
||||
var limit int
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
if limit == 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
mainQuery := fmt.Sprintf(`
|
||||
SELECT tag_key, tag_type, tag_data_type, max(priority) as priority
|
||||
FROM (
|
||||
%s
|
||||
) AS combined_results
|
||||
GROUP BY tag_key, tag_type, tag_data_type
|
||||
ORDER BY priority
|
||||
LIMIT %d
|
||||
`, strings.Join(queries, " UNION ALL "), limit+1)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, mainQuery, allArgs...)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
keys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
rowCount := 0
|
||||
conds := []string{}
|
||||
searchTexts := []string{}
|
||||
dataTypes := []telemetrytypes.FieldDataType{}
|
||||
|
||||
// Collect search texts and data types for static field matching
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
|
||||
if fieldKeySelector.StartUnixMilli != 0 {
|
||||
conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
|
||||
}
|
||||
if fieldKeySelector.EndUnixMilli != 0 {
|
||||
conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
|
||||
}
|
||||
|
||||
// key part of the selector
|
||||
fieldKeyConds := []string{}
|
||||
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("tag_key", fieldKeySelector.Name))
|
||||
} else {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.ILike("tag_key", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
|
||||
}
|
||||
searchTexts = append(searchTexts, fieldKeySelector.Name)
|
||||
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
dataTypes = append(dataTypes, fieldKeySelector.FieldDataType)
|
||||
}
|
||||
|
||||
// now look at the field context
|
||||
// we don't write most of intrinsic fields to tag attributes table
|
||||
// for this reason we don't want to apply tag_type if the field context
|
||||
// if not attribute or resource attribute
|
||||
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified &&
|
||||
(fieldKeySelector.FieldContext == telemetrytypes.FieldContextAttribute ||
|
||||
fieldKeySelector.FieldContext == telemetrytypes.FieldContextResource) {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("tag_type", fieldKeySelector.FieldContext.TagType()))
|
||||
}
|
||||
|
||||
// now look at the field data type
|
||||
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("tag_data_type", fieldKeySelector.FieldDataType.TagDataType()))
|
||||
}
|
||||
|
||||
conds = append(conds, sb.And(fieldKeyConds...))
|
||||
limit += fieldKeySelector.Limit
|
||||
if strings.TrimSpace(fieldKeySelector.Name) == "" {
|
||||
sb.Limit(200)
|
||||
}
|
||||
}
|
||||
sb.Where(sb.Or(conds...))
|
||||
sb.GroupBy("tag_key", "tag_type", "tag_data_type")
|
||||
if limit == 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
mainSb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", "max(priority) as priority")
|
||||
mainSb.From(mainSb.BuilderAs(sb, "sub_query"))
|
||||
mainSb.GroupBy("tag_key", "tag_type", "tag_data_type")
|
||||
mainSb.OrderBy("priority")
|
||||
// query one extra to check if we hit the limit
|
||||
mainSb.Limit(limit + 1)
|
||||
|
||||
query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
|
||||
}
|
||||
defer rows.Close()
|
||||
keys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
rowCount := 0
|
||||
for rows.Next() {
|
||||
rowCount++
|
||||
// reached the limit, we know there are more results
|
||||
@@ -575,21 +510,6 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
|
||||
return keys, complete, nil
|
||||
}
|
||||
|
||||
func getPriorityForContext(ctx telemetrytypes.FieldContext) int {
|
||||
switch ctx {
|
||||
case telemetrytypes.FieldContextLog:
|
||||
return 1
|
||||
case telemetrytypes.FieldContextResource:
|
||||
return 2
|
||||
case telemetrytypes.FieldContextScope:
|
||||
return 3
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
return 4
|
||||
default:
|
||||
return 5
|
||||
}
|
||||
}
|
||||
|
||||
// getMetricsKeys returns the keys from the metrics that match the field selection criteria
|
||||
func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
|
||||
if len(fieldKeySelectors) == 0 {
|
||||
@@ -936,7 +856,6 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
|
||||
}
|
||||
|
||||
whereClause, err := querybuilder.PrepareWhereClause(fieldValueSelector.ExistingQuery, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: t.logger,
|
||||
FieldMapper: t.fm,
|
||||
ConditionBuilder: t.conditionBuilder,
|
||||
FieldKeys: keys,
|
||||
|
||||
@@ -40,7 +40,6 @@ func TestGetKeys(t *testing.T) {
|
||||
mockTelemetryStore,
|
||||
telemetrytraces.DBName,
|
||||
telemetrytraces.TagAttributesV2TableName,
|
||||
telemetrytraces.SpanAttributesKeysTblName,
|
||||
telemetrytraces.SpanIndexV3TableName,
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
@@ -49,8 +48,6 @@ func TestGetKeys(t *testing.T) {
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
DBName,
|
||||
AttributesMetadataLocalTableName,
|
||||
)
|
||||
|
||||
@@ -141,7 +141,6 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
||||
)
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
@@ -224,7 +223,6 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
|
||||
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
@@ -288,7 +286,6 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
)
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"slices"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
|
||||
@@ -29,16 +28,6 @@ func (c *conditionBuilder) conditionFor(
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
) (string, error) {
|
||||
|
||||
switch operator {
|
||||
case qbtypes.FilterOperatorContains,
|
||||
qbtypes.FilterOperatorNotContains,
|
||||
qbtypes.FilterOperatorILike,
|
||||
qbtypes.FilterOperatorNotILike,
|
||||
qbtypes.FilterOperatorLike,
|
||||
qbtypes.FilterOperatorNotLike:
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
tblFieldName, err := c.fm.FieldFor(ctx, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
@@ -133,19 +133,6 @@ func TestConditionFor(t *testing.T) {
|
||||
expectedSQL: "",
|
||||
expectedError: qbtypes.ErrInValues,
|
||||
},
|
||||
{
|
||||
name: "Contains operator - string attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorContains,
|
||||
value: 521509198310,
|
||||
expectedSQL: "LOWER(JSONExtractString(labels, 'user.id')) LIKE LOWER(?)",
|
||||
expectedArgs: []any{"%521509198310%"},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Not In operator - metric_name",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
|
||||
@@ -68,9 +68,6 @@ func GetKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation])
|
||||
for idx := range keySelectors {
|
||||
keySelectors[idx].Signal = telemetrytypes.SignalMetrics
|
||||
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
||||
keySelectors[idx].MetricContext = &telemetrytypes.MetricContext{
|
||||
MetricName: query.Aggregations[0].MetricName,
|
||||
}
|
||||
}
|
||||
return keySelectors
|
||||
}
|
||||
@@ -298,7 +295,6 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
|
||||
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
|
||||
@@ -16,13 +16,22 @@ type provider struct {
|
||||
hooks []telemetrystore.TelemetryStoreHook
|
||||
}
|
||||
|
||||
func NewFactory(hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
|
||||
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) {
|
||||
return New(ctx, providerSettings, config, hookFactories...)
|
||||
// we want to fail fast so we have hook registration errors before creating the telemetry store
|
||||
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
|
||||
for i, hookFactory := range hookFactories {
|
||||
hook, err := hookFactory.New(ctx, providerSettings, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hooks[i] = hook
|
||||
}
|
||||
return New(ctx, providerSettings, config, hooks...)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) (telemetrystore.TelemetryStore, error) {
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
|
||||
|
||||
options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
|
||||
@@ -38,20 +47,6 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var version string
|
||||
if err := chConn.QueryRow(ctx, "SELECT version()").Scan(&version); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
|
||||
for i, hookFactory := range hookFactories {
|
||||
hook, err := hookFactory(version).New(ctx, providerSettings, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hooks[i] = hook
|
||||
}
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
clickHouseConn: chConn,
|
||||
|
||||
@@ -46,7 +46,6 @@ type QuerySettings struct {
|
||||
MaxBytesToRead int `mapstructure:"max_bytes_to_read"`
|
||||
MaxResultRows int `mapstructure:"max_result_rows"`
|
||||
IgnoreDataSkippingIndices string `mapstructure:"ignore_data_skipping_indices"`
|
||||
SecondaryIndicesEnableBulkFiltering bool `mapstructure:"secondary_indices_enable_bulk_filtering"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type TelemetryStore interface {
|
||||
@@ -20,8 +19,6 @@ type TelemetryStoreHook interface {
|
||||
AfterQuery(ctx context.Context, event *QueryEvent)
|
||||
}
|
||||
|
||||
type TelemetryStoreHookFactoryFunc func(string) factory.ProviderFactory[TelemetryStoreHook, Config]
|
||||
|
||||
func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, event *QueryEvent) context.Context {
|
||||
for _, hook := range hooks {
|
||||
ctx = hook.BeforeQuery(ctx, event)
|
||||
|
||||
@@ -3,7 +3,6 @@ package telemetrystorehook
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
@@ -12,20 +11,16 @@ import (
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
clickHouseVersion string
|
||||
settings telemetrystore.QuerySettings
|
||||
settings telemetrystore.QuerySettings
|
||||
}
|
||||
|
||||
func NewSettingsFactory(version string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("settings"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
||||
return NewSettings(ctx, providerSettings, config, version)
|
||||
})
|
||||
func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings)
|
||||
}
|
||||
|
||||
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, version string) (telemetrystore.TelemetryStoreHook, error) {
|
||||
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
||||
return &provider{
|
||||
clickHouseVersion: version,
|
||||
settings: config.Clickhouse.QuerySettings,
|
||||
settings: config.Clickhouse.QuerySettings,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -80,13 +75,6 @@ func (h *provider) BeforeQuery(ctx context.Context, _ *telemetrystore.QueryEvent
|
||||
settings["result_overflow_mode"] = ctx.Value("result_overflow_mode")
|
||||
}
|
||||
|
||||
// ClickHouse version check is added since this setting is not support on version below 25.5
|
||||
if strings.HasPrefix(h.clickHouseVersion, "25") && !h.settings.SecondaryIndicesEnableBulkFiltering {
|
||||
// TODO(srikanthccv): enable it when the "Cannot read all data" issue is fixed
|
||||
// https://github.com/ClickHouse/ClickHouse/issues/82283
|
||||
settings["secondary_indices_enable_bulk_filtering"] = false
|
||||
}
|
||||
|
||||
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
|
||||
return ctx
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -34,17 +33,6 @@ func (c *conditionBuilder) conditionFor(
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
) (string, error) {
|
||||
|
||||
switch operator {
|
||||
case qbtypes.FilterOperatorContains,
|
||||
qbtypes.FilterOperatorNotContains,
|
||||
qbtypes.FilterOperatorILike,
|
||||
qbtypes.FilterOperatorNotILike,
|
||||
qbtypes.FilterOperatorLike,
|
||||
qbtypes.FilterOperatorNotLike:
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
// first, locate the raw column type (so we can choose the right EXISTS logic)
|
||||
column, err := c.fm.ColumnFor(ctx, key)
|
||||
if err != nil {
|
||||
|
||||
@@ -111,32 +111,6 @@ func TestConditionFor(t *testing.T) {
|
||||
expectedArgs: []any{"%admin%", true},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Contains operator - string attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorContains,
|
||||
value: 521509198310,
|
||||
expectedSQL: "LOWER(attributes_string['user.id']) LIKE LOWER(?)",
|
||||
expectedArgs: []any{"%521509198310%", true},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "LIKE operator - string attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorLike,
|
||||
value: 521509198310,
|
||||
expectedSQL: "attributes_string['user.id'] LIKE ?",
|
||||
expectedArgs: []any{"521509198310", true},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Between operator - timestamp",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
|
||||
@@ -3,7 +3,6 @@ package telemetrytraces
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -65,7 +64,6 @@ func TestSpanScopeFilterExpression(t *testing.T) {
|
||||
}}
|
||||
|
||||
whereClause, err := querybuilder.PrepareWhereClause(tt.expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: fieldKeys,
|
||||
@@ -132,7 +130,6 @@ func TestSpanScopeWithResourceFilter(t *testing.T) {
|
||||
}}
|
||||
|
||||
_, err := querybuilder.PrepareWhereClause(tt.expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: instrumentationtest.New().Logger(),
|
||||
FieldMapper: fm,
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: fieldKeys,
|
||||
|
||||
@@ -735,7 +735,6 @@ func (b *traceQueryStatementBuilder) addFilterCondition(
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
// add filter expression
|
||||
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
|
||||
@@ -21,7 +21,6 @@ func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.TraceAggregati
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
|
||||
return resourcefilter.NewTraceResourceFilterStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
fm,
|
||||
cb,
|
||||
mockMetadataStore,
|
||||
@@ -328,7 +327,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
|
||||
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
@@ -496,7 +495,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
|
||||
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
@@ -558,7 +557,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
|
||||
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
|
||||
@@ -8,5 +8,4 @@ const (
|
||||
TagAttributesV2LocalTableName = "tag_attributes_v2"
|
||||
TopLevelOperationsTableName = "distributed_top_level_operations"
|
||||
TraceSummaryTableName = "distributed_trace_summary"
|
||||
SpanAttributesKeysTblName = "distributed_span_attributes_keys"
|
||||
)
|
||||
|
||||
@@ -38,13 +38,12 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
|
||||
resourceFilterFM := resourcefilter.NewFieldMapper()
|
||||
resourceFilterCB := resourcefilter.NewConditionBuilder(resourceFilterFM)
|
||||
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
resourceFilterFM,
|
||||
resourceFilterCB,
|
||||
mockMetadataStore,
|
||||
)
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
|
||||
|
||||
statementBuilder := NewTraceQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
|
||||
@@ -3,6 +3,8 @@ package ruletypes
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/converter"
|
||||
"math"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -11,6 +13,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
)
|
||||
|
||||
// this file contains common structs and methods used by
|
||||
@@ -103,6 +106,294 @@ const (
|
||||
Last MatchType = "5"
|
||||
)
|
||||
|
||||
type RuleThreshold interface {
|
||||
Name() string
|
||||
Target() float64
|
||||
RecoveryTarget() float64
|
||||
|
||||
MatchType() MatchType
|
||||
CompareOp() CompareOp
|
||||
|
||||
SelectedQuery() string
|
||||
ShouldAlert(series v3.Series) (Sample, bool)
|
||||
}
|
||||
|
||||
type BasicRuleThreshold struct {
|
||||
name string
|
||||
target *float64
|
||||
targetUnit string
|
||||
ruleUnit string
|
||||
recoveryTarget *float64
|
||||
matchType MatchType
|
||||
compareOp CompareOp
|
||||
selectedQuery string
|
||||
}
|
||||
|
||||
func NewBasicRuleThreshold(name string, target *float64, recoveryTarget *float64, matchType MatchType, op CompareOp, selectedQuery string, targetUnit string, ruleUnit string) *BasicRuleThreshold {
|
||||
return &BasicRuleThreshold{name: name, target: target, recoveryTarget: recoveryTarget, matchType: matchType, selectedQuery: selectedQuery, compareOp: op, targetUnit: targetUnit, ruleUnit: ruleUnit}
|
||||
}
|
||||
|
||||
func (b BasicRuleThreshold) Name() string {
|
||||
return b.name
|
||||
}
|
||||
|
||||
func (b BasicRuleThreshold) Target() float64 {
|
||||
unitConverter := converter.FromUnit(converter.Unit(b.targetUnit))
|
||||
// convert the target value to the y-axis unit
|
||||
value := unitConverter.Convert(converter.Value{
|
||||
F: *b.target,
|
||||
U: converter.Unit(b.targetUnit),
|
||||
}, converter.Unit(b.ruleUnit))
|
||||
return value.F
|
||||
}
|
||||
|
||||
func (b BasicRuleThreshold) RecoveryTarget() float64 {
|
||||
return *b.recoveryTarget
|
||||
}
|
||||
|
||||
func (b BasicRuleThreshold) MatchType() MatchType {
|
||||
return b.matchType
|
||||
}
|
||||
|
||||
func (b BasicRuleThreshold) CompareOp() CompareOp {
|
||||
return b.compareOp
|
||||
}
|
||||
|
||||
func (b BasicRuleThreshold) SelectedQuery() string {
|
||||
return b.selectedQuery
|
||||
}
|
||||
|
||||
func removeGroupinSetPoints(series v3.Series) []v3.Point {
|
||||
var result []v3.Point
|
||||
for _, s := range series.Points {
|
||||
if s.Timestamp >= 0 && !math.IsNaN(s.Value) && !math.IsInf(s.Value, 0) {
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (b BasicRuleThreshold) ShouldAlert(series v3.Series) (Sample, bool) {
|
||||
var shouldAlert bool
|
||||
var alertSmpl Sample
|
||||
var lbls qslabels.Labels
|
||||
|
||||
for name, value := range series.Labels {
|
||||
lbls = append(lbls, qslabels.Label{Name: name, Value: value})
|
||||
}
|
||||
|
||||
lbls = append(lbls, qslabels.Label{Name: "threshold", Value: b.name})
|
||||
|
||||
series.Points = removeGroupinSetPoints(series)
|
||||
|
||||
// nothing to evaluate
|
||||
if len(series.Points) == 0 {
|
||||
return alertSmpl, false
|
||||
}
|
||||
|
||||
switch b.MatchType() {
|
||||
case AtleastOnce:
|
||||
// If any sample matches the condition, the rule is firing.
|
||||
if b.CompareOp() == ValueIsAbove {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value > b.Target() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsBelow {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value < b.Target() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsEq {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value == b.Target() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsNotEq {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value != b.Target() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if b.CompareOp() == ValueOutsideBounds {
|
||||
for _, smpl := range series.Points {
|
||||
if math.Abs(smpl.Value) >= b.Target() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
case AllTheTimes:
|
||||
// If all samples match the condition, the rule is firing.
|
||||
shouldAlert = true
|
||||
alertSmpl = Sample{Point: Point{V: b.Target()}, Metric: lbls}
|
||||
if b.CompareOp() == ValueIsAbove {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value <= b.Target() {
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
// use min value from the series
|
||||
if shouldAlert {
|
||||
var minValue float64 = math.Inf(1)
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value < minValue {
|
||||
minValue = smpl.Value
|
||||
}
|
||||
}
|
||||
alertSmpl = Sample{Point: Point{V: minValue}, Metric: lbls}
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsBelow {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value >= b.Target() {
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if shouldAlert {
|
||||
var maxValue float64 = math.Inf(-1)
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value > maxValue {
|
||||
maxValue = smpl.Value
|
||||
}
|
||||
}
|
||||
alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lbls}
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsEq {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value != b.Target() {
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsNotEq {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value == b.Target() {
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
// use any non-inf or nan value from the series
|
||||
if shouldAlert {
|
||||
for _, smpl := range series.Points {
|
||||
if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if b.CompareOp() == ValueOutsideBounds {
|
||||
for _, smpl := range series.Points {
|
||||
if math.Abs(smpl.Value) < b.Target() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
case OnAverage:
|
||||
// If the average of all samples matches the condition, the rule is firing.
|
||||
var sum, count float64
|
||||
for _, smpl := range series.Points {
|
||||
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
|
||||
continue
|
||||
}
|
||||
sum += smpl.Value
|
||||
count++
|
||||
}
|
||||
avg := sum / count
|
||||
alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls}
|
||||
if b.CompareOp() == ValueIsAbove {
|
||||
if avg > b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsBelow {
|
||||
if avg < b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsEq {
|
||||
if avg == b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsNotEq {
|
||||
if avg != b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueOutsideBounds {
|
||||
if math.Abs(avg) >= b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
}
|
||||
case InTotal:
|
||||
// If the sum of all samples matches the condition, the rule is firing.
|
||||
var sum float64
|
||||
|
||||
for _, smpl := range series.Points {
|
||||
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
|
||||
continue
|
||||
}
|
||||
sum += smpl.Value
|
||||
}
|
||||
alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls}
|
||||
if b.CompareOp() == ValueIsAbove {
|
||||
if sum > b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsBelow {
|
||||
if sum < b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsEq {
|
||||
if sum == b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsNotEq {
|
||||
if sum != b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueOutsideBounds {
|
||||
if math.Abs(sum) >= b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
}
|
||||
case Last:
|
||||
// If the last sample matches the condition, the rule is firing.
|
||||
shouldAlert = false
|
||||
alertSmpl = Sample{Point: Point{V: series.Points[len(series.Points)-1].Value}, Metric: lbls}
|
||||
if b.CompareOp() == ValueIsAbove {
|
||||
if series.Points[len(series.Points)-1].Value > b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsBelow {
|
||||
if series.Points[len(series.Points)-1].Value < b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsEq {
|
||||
if series.Points[len(series.Points)-1].Value == b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if b.CompareOp() == ValueIsNotEq {
|
||||
if series.Points[len(series.Points)-1].Value != b.Target() {
|
||||
shouldAlert = true
|
||||
}
|
||||
}
|
||||
}
|
||||
return alertSmpl, shouldAlert
|
||||
}
|
||||
|
||||
type RuleCondition struct {
|
||||
CompositeQuery *v3.CompositeQuery `json:"compositeQuery,omitempty" yaml:"compositeQuery,omitempty"`
|
||||
CompareOp CompareOp `yaml:"op,omitempty" json:"op,omitempty"`
|
||||
@@ -116,6 +407,7 @@ type RuleCondition struct {
|
||||
SelectedQuery string `json:"selectedQueryName,omitempty"`
|
||||
RequireMinPoints bool `yaml:"requireMinPoints,omitempty" json:"requireMinPoints,omitempty"`
|
||||
RequiredNumPoints int `yaml:"requiredNumPoints,omitempty" json:"requiredNumPoints,omitempty"`
|
||||
Thresholds []RuleThreshold `yaml:"thresholds,omitempty" json:"thresholds,omitempty"`
|
||||
}
|
||||
|
||||
func (rc *RuleCondition) GetSelectedQueryName() string {
|
||||
|
||||
@@ -65,8 +65,12 @@ type PostableRule struct {
|
||||
Version string `json:"version,omitempty"`
|
||||
|
||||
// legacy
|
||||
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
|
||||
OldYaml string `json:"yaml,omitempty"`
|
||||
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
|
||||
OldYaml string `json:"yaml,omitempty"`
|
||||
EvalType string `yaml:"evalType,omitempty" json:"evalType,omitempty"`
|
||||
Evaluation Evaluation `yaml:"evaluation,omitempty" json:"evaluation,omitempty"`
|
||||
StartsAt int64 `yaml:"startsAt,omitempty" json:"startsAt,omitempty"`
|
||||
Schedule string `json:"schedule,omitempty"`
|
||||
}
|
||||
|
||||
func ParsePostableRule(content []byte) (*PostableRule, error) {
|
||||
@@ -140,6 +144,15 @@ func ParseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*P
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//added alerts v2 fields
|
||||
rule.RuleCondition.Thresholds = append(rule.RuleCondition.Thresholds,
|
||||
NewBasicRuleThreshold(rule.AlertName, rule.RuleCondition.Target, nil, rule.RuleCondition.MatchType, rule.RuleCondition.CompareOp, rule.RuleCondition.SelectedQuery, rule.RuleCondition.TargetUnit, rule.RuleCondition.CompositeQuery.Unit))
|
||||
if rule.EvalType == "" || rule.EvalType == "rolling" {
|
||||
rule.EvalType = "rolling"
|
||||
rule.Evaluation = NewEvaluation(rule.EvalType, RollingWindow{EvalWindow: rule.EvalWindow, Frequency: rule.Frequency, RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
|
||||
} else if rule.EvalType == "cumulative" {
|
||||
rule.Evaluation = NewEvaluation(rule.EvalType, CumulativeWindow{EvalWindow: rule.EvalWindow, StartsAt: time.UnixMilli(rule.StartsAt), RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
|
||||
67
pkg/types/ruletypes/evaluation.go
Normal file
67
pkg/types/ruletypes/evaluation.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package ruletypes
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
type Evaluation interface {
|
||||
EvaluationTime(curr time.Time) (time.Time, time.Time)
|
||||
}
|
||||
|
||||
type RollingWindow struct {
|
||||
EvalWindow Duration `json:"evalWindow"`
|
||||
Frequency Duration `json:"frequency"`
|
||||
RequireMinPoints bool `json:"requireMinPoints"`
|
||||
RequiredNumPoints int `json:"requiredNumPoints"`
|
||||
SkipEvalForNewGroups []v3.AttributeKey `json:"skipEvalForNewGroups"`
|
||||
}
|
||||
|
||||
func (rollingWindow *RollingWindow) EvaluationTime(curr time.Time) (time.Time, time.Time) {
|
||||
return curr.Add(time.Duration(-rollingWindow.EvalWindow)), curr
|
||||
}
|
||||
|
||||
type CumulativeWindow struct {
|
||||
StartsAt time.Time `json:"startsAt"`
|
||||
EvalWindow Duration `json:"evalWindow"`
|
||||
RequireMinPoints bool `json:"requireMinPoints"`
|
||||
RequiredNumPoints int `json:"requiredNumPoints"`
|
||||
SkipEvalForNewGroups []v3.AttributeKey `json:"skipEvalForNewGroups"`
|
||||
}
|
||||
|
||||
func (cumulativeWindow *CumulativeWindow) EvaluationTime(curr time.Time) (time.Time, time.Time) {
|
||||
if curr.Before(cumulativeWindow.StartsAt) {
|
||||
return curr, curr
|
||||
}
|
||||
|
||||
dur := time.Duration(cumulativeWindow.EvalWindow)
|
||||
if dur <= 0 {
|
||||
return curr, curr
|
||||
}
|
||||
|
||||
// Calculate the number of complete windows since StartsAt
|
||||
elapsed := curr.Sub(cumulativeWindow.StartsAt)
|
||||
windows := int64(elapsed / dur)
|
||||
windowStart := cumulativeWindow.StartsAt.Add(time.Duration(windows) * dur)
|
||||
return windowStart, curr
|
||||
}
|
||||
|
||||
func NewEvaluation(evalType string, params interface{}) Evaluation {
|
||||
switch evalType {
|
||||
case "rolling":
|
||||
p, ok := params.(RollingWindow)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return &p
|
||||
case "cumulative":
|
||||
p, ok := params.(CumulativeWindow)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return &p
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
10
tests/integration/poetry.lock
generated
10
tests/integration/poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "astroid"
|
||||
@@ -390,7 +390,7 @@ files = [
|
||||
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
|
||||
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
|
||||
]
|
||||
markers = {main = "sys_platform == \"win32\"", dev = "platform_system == \"Windows\" or sys_platform == \"win32\""}
|
||||
markers = {main = "sys_platform == \"win32\"", dev = "sys_platform == \"win32\" or platform_system == \"Windows\""}
|
||||
|
||||
[[package]]
|
||||
name = "dill"
|
||||
@@ -983,14 +983,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "2.5.0"
|
||||
version = "2.4.0"
|
||||
description = "HTTP library with thread-safe connection pooling, file post, and more."
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"},
|
||||
{file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"},
|
||||
{file = "urllib3-2.4.0-py3-none-any.whl", hash = "sha256:4e16665048960a0900c702d4a66415956a584919c03361cac9f1df5c5dd7e813"},
|
||||
{file = "urllib3-2.4.0.tar.gz", hash = "sha256:414bc6535b787febd7567804cc015fee39daab8ad86268f1310a9250697de466"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
|
||||
Reference in New Issue
Block a user