mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-07 18:32:12 +00:00
Compare commits
2 Commits
v0.110.2
...
replace-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
892cd04475 | ||
|
|
a2aa7d7342 |
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
@@ -11,8 +11,5 @@
|
||||
"[go]": {
|
||||
"editor.formatOnSave": true,
|
||||
"editor.defaultFormatter": "golang.go"
|
||||
},
|
||||
"[sql]": {
|
||||
"editor.defaultFormatter": "adpyke.vscode-sql-formatter"
|
||||
}
|
||||
}
|
||||
|
||||
11
README.md
11
README.md
@@ -66,17 +66,6 @@ Read [more](https://signoz.io/metrics-and-dashboards/).
|
||||
|
||||

|
||||
|
||||
### LLM Observability
|
||||
|
||||
Monitor and debug your LLM applications with comprehensive observability. Track LLM calls, analyze token usage, monitor performance, and gain insights into your AI application's behavior in production.
|
||||
|
||||
SigNoz LLM observability helps you understand how your language models are performing, identify issues with prompts and responses, track token usage and costs, and optimize your AI applications for better performance and reliability.
|
||||
|
||||
[Get started with LLM Observability →](https://signoz.io/docs/llm-observability/)
|
||||
|
||||

|
||||
|
||||
|
||||
### Alerts
|
||||
|
||||
Use alerts in SigNoz to get notified when anything unusual happens in your application. You can set alerts on any type of telemetry signal (logs, metrics, traces), create thresholds and set up a notification channel to get notified. Advanced features like alert history and anomaly detection can help you create smarter alerts.
|
||||
|
||||
@@ -176,7 +176,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.110.1
|
||||
image: signoz/signoz:v0.110.0
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
ports:
|
||||
|
||||
@@ -117,7 +117,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.110.1
|
||||
image: signoz/signoz:v0.110.0
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
ports:
|
||||
|
||||
@@ -179,7 +179,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.110.1}
|
||||
image: signoz/signoz:${VERSION:-v0.110.0}
|
||||
container_name: signoz
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
|
||||
@@ -111,7 +111,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.110.1}
|
||||
image: signoz/signoz:${VERSION:-v0.110.0}
|
||||
container_name: signoz
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
|
||||
@@ -105,7 +105,6 @@
|
||||
"i18next": "^21.6.12",
|
||||
"i18next-browser-languagedetector": "^6.1.3",
|
||||
"i18next-http-backend": "^1.3.2",
|
||||
"immer": "11.1.3",
|
||||
"jest": "^27.5.1",
|
||||
"js-base64": "^3.7.2",
|
||||
"less": "^4.1.2",
|
||||
|
||||
@@ -15,7 +15,6 @@ import logEvent from 'api/common/logEvent';
|
||||
import LaunchChatSupport from 'components/LaunchChatSupport/LaunchChatSupport';
|
||||
import { DOCS_BASE_URL } from 'constants/app';
|
||||
import ROUTES from 'constants/routes';
|
||||
import { useGetGlobalConfig } from 'hooks/globalConfig/useGetGlobalConfig';
|
||||
import useDebouncedFn from 'hooks/useDebouncedFunction';
|
||||
import history from 'lib/history';
|
||||
import { isEmpty } from 'lodash-es';
|
||||
@@ -149,8 +148,6 @@ function OnboardingAddDataSource(): JSX.Element {
|
||||
|
||||
const { org } = useAppContext();
|
||||
|
||||
const { data: globalConfig } = useGetGlobalConfig();
|
||||
|
||||
const [setupStepItems, setSetupStepItems] = useState(setupStepItemsBase);
|
||||
|
||||
const [searchQuery, setSearchQuery] = useState<string>('');
|
||||
@@ -236,16 +233,6 @@ function OnboardingAddDataSource(): JSX.Element {
|
||||
urlObj.searchParams.set('environment', selectedEnvironment);
|
||||
}
|
||||
|
||||
const ingestionUrl = globalConfig?.data?.ingestion_url;
|
||||
|
||||
if (ingestionUrl) {
|
||||
const parts = ingestionUrl.split('.');
|
||||
if (parts?.length > 1 && parts[0]?.includes('ingest')) {
|
||||
const region = parts[1];
|
||||
urlObj.searchParams.set('region', region);
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: Return the updated URL as a string
|
||||
const updatedUrl = urlObj.toString();
|
||||
|
||||
|
||||
@@ -1,17 +1,7 @@
|
||||
/* eslint-disable sonarjs/no-duplicate-string */
|
||||
import { screen, within } from '@testing-library/react';
|
||||
import { ENVIRONMENT } from 'constants/env';
|
||||
import { server } from 'mocks-server/server';
|
||||
import { rest } from 'msw';
|
||||
import { screen } from '@testing-library/react';
|
||||
import { PreferenceContextProvider } from 'providers/preferences/context/PreferenceContextProvider';
|
||||
import {
|
||||
findByText,
|
||||
fireEvent,
|
||||
render,
|
||||
userEvent,
|
||||
waitFor,
|
||||
} from 'tests/test-utils';
|
||||
import { DataTypes } from 'types/api/queryBuilder/queryAutocompleteResponse';
|
||||
import { findByText, fireEvent, render, waitFor } from 'tests/test-utils';
|
||||
|
||||
import { pipelineApiResponseMockData } from '../mocks/pipeline';
|
||||
import PipelineListsView from '../PipelineListsView';
|
||||
@@ -85,20 +75,7 @@ jest.mock('providers/preferences/sync/usePreferenceSync', () => ({
|
||||
}),
|
||||
}));
|
||||
|
||||
const BASE_URL = ENVIRONMENT.baseURL;
|
||||
const attributeKeysURL = `${BASE_URL}/api/v3/autocomplete/attribute_keys`;
|
||||
|
||||
describe('PipelinePage container test', () => {
|
||||
beforeAll(() => {
|
||||
server.listen();
|
||||
});
|
||||
afterEach(() => {
|
||||
server.resetHandlers();
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
afterAll(() => {
|
||||
server.close();
|
||||
});
|
||||
it('should render PipelineListsView section', () => {
|
||||
const { getByText, container } = render(
|
||||
<PreferenceContextProvider>
|
||||
@@ -295,7 +272,6 @@ describe('PipelinePage container test', () => {
|
||||
});
|
||||
|
||||
it('should have populated form fields when edit pipeline is clicked', async () => {
|
||||
const user = userEvent.setup({ pointerEventsCheck: 0 });
|
||||
render(
|
||||
<PreferenceContextProvider>
|
||||
<PipelineListsView
|
||||
@@ -325,52 +301,5 @@ describe('PipelinePage container test', () => {
|
||||
|
||||
// to have length 2
|
||||
expect(screen.queryAllByText('source = nginx').length).toBe(2);
|
||||
|
||||
server.use(
|
||||
rest.get(attributeKeysURL, (_req, res, ctx) =>
|
||||
res(
|
||||
ctx.status(200),
|
||||
ctx.json({
|
||||
status: 'success',
|
||||
data: {
|
||||
attributeKeys: [
|
||||
{
|
||||
key: 'otelServiceName',
|
||||
dataType: DataTypes.String,
|
||||
type: 'tag',
|
||||
},
|
||||
{
|
||||
key: 'service.instance.id',
|
||||
dataType: DataTypes.String,
|
||||
type: 'resource',
|
||||
},
|
||||
{
|
||||
key: 'service.name',
|
||||
dataType: DataTypes.String,
|
||||
type: 'resource',
|
||||
},
|
||||
{
|
||||
key: 'service.name',
|
||||
dataType: DataTypes.String,
|
||||
type: 'tag',
|
||||
},
|
||||
],
|
||||
},
|
||||
}),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
// Open Filter input and type to trigger suggestions
|
||||
const filterSelect = screen.getByTestId('qb-search-select');
|
||||
const input = within(filterSelect).getByRole('combobox') as HTMLInputElement;
|
||||
|
||||
await user.click(input);
|
||||
await waitFor(() =>
|
||||
expect(screen.getByText('otelServiceName')).toBeInTheDocument(),
|
||||
);
|
||||
|
||||
const serviceNameOccurences = await screen.findAllByText('service.name');
|
||||
expect(serviceNameOccurences.length).toBeGreaterThanOrEqual(2);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -356,10 +356,7 @@ function QueryBuilderSearch({
|
||||
|
||||
// conditional changes here to use a seperate component to render the example queries based on the option group label
|
||||
const customRendererForLogsExplorer = options.map((option) => (
|
||||
<Select.Option
|
||||
key={`${option.label}-${option.type || ''}-${option.dataType || ''}`}
|
||||
value={option.value}
|
||||
>
|
||||
<Select.Option key={option.label} value={option.value}>
|
||||
<OptionRendererForLogs
|
||||
label={option.label}
|
||||
value={option.value}
|
||||
@@ -374,7 +371,6 @@ function QueryBuilderSearch({
|
||||
return (
|
||||
<div className="query-builder-search-container">
|
||||
<Select
|
||||
data-testid={'qb-search-select'}
|
||||
ref={selectRef}
|
||||
getPopupContainer={popupContainer}
|
||||
transitionName=""
|
||||
@@ -492,10 +488,7 @@ function QueryBuilderSearch({
|
||||
{isLogsExplorerPage
|
||||
? customRendererForLogsExplorer
|
||||
: options.map((option) => (
|
||||
<Select.Option
|
||||
key={`${option.label}-${option.type || ''}-${option.dataType || ''}`}
|
||||
value={option.value}
|
||||
>
|
||||
<Select.Option key={option.label} value={option.value}>
|
||||
<OptionRenderer
|
||||
label={option.label}
|
||||
value={option.value}
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
import { useSyncExternalStore } from 'react';
|
||||
|
||||
import {
|
||||
dashboardVariablesStore,
|
||||
IDashboardVariables,
|
||||
} from '../../providers/Dashboard/store/dashboardVariablesStore';
|
||||
|
||||
export const useDashboardVariables = (): {
|
||||
dashboardVariables: IDashboardVariables;
|
||||
} => {
|
||||
const dashboardVariables = useSyncExternalStore(
|
||||
dashboardVariablesStore.subscribe,
|
||||
dashboardVariablesStore.getSnapshot,
|
||||
);
|
||||
|
||||
return {
|
||||
dashboardVariables,
|
||||
};
|
||||
};
|
||||
@@ -1,30 +0,0 @@
|
||||
import { useMemo } from 'react';
|
||||
import {
|
||||
IDashboardVariable,
|
||||
TVariableQueryType,
|
||||
} from 'types/api/dashboard/getAll';
|
||||
|
||||
import { useDashboardVariables } from './useDashboardVariables';
|
||||
|
||||
export function useDashboardVariablesByType(
|
||||
variableType: TVariableQueryType,
|
||||
returnType: 'values',
|
||||
): IDashboardVariable[];
|
||||
export function useDashboardVariablesByType(
|
||||
variableType: TVariableQueryType,
|
||||
returnType?: 'entries',
|
||||
): [string, IDashboardVariable][];
|
||||
export function useDashboardVariablesByType(
|
||||
variableType: TVariableQueryType,
|
||||
returnType?: 'values' | 'entries',
|
||||
): IDashboardVariable[] | [string, IDashboardVariable][] {
|
||||
const { dashboardVariables } = useDashboardVariables();
|
||||
|
||||
return useMemo(() => {
|
||||
const entries = Object.entries(dashboardVariables || {}).filter(
|
||||
(entry): entry is [string, IDashboardVariable] =>
|
||||
Boolean(entry[1].name) && entry[1].type === variableType,
|
||||
);
|
||||
return returnType === 'values' ? entries.map(([, value]) => value) : entries;
|
||||
}, [dashboardVariables, variableType, returnType]);
|
||||
}
|
||||
@@ -1,28 +0,0 @@
|
||||
import { useMemo } from 'react';
|
||||
import { PANEL_GROUP_TYPES } from 'constants/queryBuilder';
|
||||
import { createDynamicVariableToWidgetsMap } from 'hooks/dashboard/utils';
|
||||
import { useDashboard } from 'providers/Dashboard/Dashboard';
|
||||
import { Widgets } from 'types/api/dashboard/getAll';
|
||||
|
||||
import { useDashboardVariablesByType } from './useDashboardVariablesByType';
|
||||
|
||||
/**
|
||||
* Hook to get a map of dynamic variable IDs to widget IDs that use them.
|
||||
* This is useful for determining which widgets need to be refreshed when a dynamic variable changes.
|
||||
*/
|
||||
export function useWidgetsByDynamicVariableId(): Record<string, string[]> {
|
||||
const dynamicVariables = useDashboardVariablesByType('DYNAMIC', 'values');
|
||||
const { selectedDashboard } = useDashboard();
|
||||
|
||||
return useMemo(() => {
|
||||
const widgets =
|
||||
selectedDashboard?.data?.widgets?.filter(
|
||||
(widget) => widget.panelTypes !== PANEL_GROUP_TYPES.ROW,
|
||||
) || [];
|
||||
|
||||
return createDynamicVariableToWidgetsMap(
|
||||
dynamicVariables,
|
||||
widgets as Widgets[],
|
||||
);
|
||||
}, [selectedDashboard, dynamicVariables]);
|
||||
}
|
||||
@@ -193,11 +193,7 @@ export const useOptions = (
|
||||
(option, index, self) =>
|
||||
index ===
|
||||
self.findIndex(
|
||||
(o) =>
|
||||
o.label === option.label &&
|
||||
o.value === option.value &&
|
||||
(o.type || '') === (option.type || '') &&
|
||||
(o.dataType || '') === (option.dataType || ''), // keep entries with same key but different type/dataType
|
||||
(o) => o.label === option.label && o.value === option.value, // to remove duplicate & empty options from list
|
||||
) && option.value !== '',
|
||||
) || []
|
||||
).map((option) => {
|
||||
|
||||
@@ -16,19 +16,13 @@ export function useResizeObserver<T extends HTMLElement>(
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
const handleResize = debounce(
|
||||
(entries: ResizeObserverEntry[]) => {
|
||||
const entry = entries[0];
|
||||
if (entry) {
|
||||
const { width, height } = entry.contentRect;
|
||||
setSize({ width, height });
|
||||
}
|
||||
},
|
||||
debounceTime,
|
||||
{
|
||||
leading: true,
|
||||
},
|
||||
);
|
||||
const handleResize = debounce((entries: ResizeObserverEntry[]) => {
|
||||
const entry = entries[0];
|
||||
if (entry) {
|
||||
const { width, height } = entry.contentRect;
|
||||
setSize({ width, height });
|
||||
}
|
||||
}, debounceTime);
|
||||
|
||||
const ro = new ResizeObserver(handleResize);
|
||||
const referenceNode = ref.current;
|
||||
|
||||
@@ -45,8 +45,6 @@ import APIError from 'types/api/error';
|
||||
import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
import { v4 as generateUUID } from 'uuid';
|
||||
|
||||
import { useDashboardVariables } from '../../hooks/dashboard/useDashboardVariables';
|
||||
import { updateDashboardVariablesStore } from './store/dashboardVariablesStore';
|
||||
import {
|
||||
DashboardSortOrder,
|
||||
IDashboardContext,
|
||||
@@ -198,16 +196,6 @@ export function DashboardProvider({
|
||||
: isDashboardWidgetPage?.params.dashboardId) || '';
|
||||
|
||||
const [selectedDashboard, setSelectedDashboard] = useState<Dashboard>();
|
||||
const dashboardVariables = useDashboardVariables();
|
||||
|
||||
useEffect(() => {
|
||||
const existingVariables = dashboardVariables;
|
||||
const updatedVariables = selectedDashboard?.data.variables || {};
|
||||
|
||||
if (!isEqual(existingVariables, updatedVariables)) {
|
||||
updateDashboardVariablesStore(updatedVariables);
|
||||
}
|
||||
}, [selectedDashboard]);
|
||||
|
||||
const {
|
||||
currentDashboard,
|
||||
|
||||
@@ -8,7 +8,6 @@ import ROUTES from 'constants/routes';
|
||||
import { DashboardProvider, useDashboard } from 'providers/Dashboard/Dashboard';
|
||||
import { IDashboardVariable } from 'types/api/dashboard/getAll';
|
||||
|
||||
import { useDashboardVariables } from '../../../hooks/dashboard/useDashboardVariables';
|
||||
import { initializeDefaultVariables } from '../initializeDefaultVariables';
|
||||
import { normalizeUrlValueForVariable } from '../normalizeUrlValue';
|
||||
|
||||
@@ -56,7 +55,6 @@ jest.mock('uuid', () => ({ v4: jest.fn(() => 'mock-uuid') }));
|
||||
|
||||
function TestComponent(): JSX.Element {
|
||||
const { dashboardResponse, dashboardId, selectedDashboard } = useDashboard();
|
||||
const { dashboardVariables } = useDashboardVariables();
|
||||
|
||||
return (
|
||||
<div>
|
||||
@@ -67,7 +65,9 @@ function TestComponent(): JSX.Element {
|
||||
{dashboardResponse.isFetching.toString()}
|
||||
</div>
|
||||
<div data-testid="dashboard-variables">
|
||||
{dashboardVariables ? JSON.stringify(dashboardVariables) : 'null'}
|
||||
{selectedDashboard?.data?.variables
|
||||
? JSON.stringify(selectedDashboard.data.variables)
|
||||
: 'null'}
|
||||
</div>
|
||||
<div data-testid="dashboard-data">
|
||||
{selectedDashboard?.data?.title || 'No Title'}
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
import { IDashboardVariable } from 'types/api/dashboard/getAll';
|
||||
|
||||
import createStore from './store';
|
||||
|
||||
// export type IDashboardVariables = DashboardData['variables'];
|
||||
export type IDashboardVariables = Record<string, IDashboardVariable>;
|
||||
|
||||
export const dashboardVariablesStore = createStore<IDashboardVariables>({});
|
||||
|
||||
export function updateDashboardVariablesStore(
|
||||
variables: Partial<IDashboardVariables>,
|
||||
): void {
|
||||
dashboardVariablesStore.update((currentVariables) => ({
|
||||
...currentVariables,
|
||||
...variables,
|
||||
}));
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
import { produce } from 'immer';
|
||||
type ListenerFn = () => void;
|
||||
|
||||
export default function createStore<T>(
|
||||
init: T,
|
||||
): {
|
||||
set: (setter: any) => void;
|
||||
update: (updater: (draft: T) => void) => void;
|
||||
subscribe: (listener: ListenerFn) => () => void;
|
||||
getSnapshot: () => T;
|
||||
} {
|
||||
let listeners: ListenerFn[] = [];
|
||||
let state = init;
|
||||
|
||||
function emitChange(): void {
|
||||
for (const listener of listeners) {
|
||||
listener();
|
||||
}
|
||||
}
|
||||
|
||||
function set(setter: any): void {
|
||||
state = produce(state, setter);
|
||||
emitChange();
|
||||
}
|
||||
|
||||
function update(updater: (draft: T) => void): void {
|
||||
state = produce(state, updater);
|
||||
emitChange();
|
||||
}
|
||||
|
||||
return {
|
||||
set,
|
||||
update,
|
||||
subscribe(listener: ListenerFn): () => void {
|
||||
listeners = [...listeners, listener];
|
||||
return (): void => {
|
||||
listeners = listeners.filter((l) => l !== listener);
|
||||
};
|
||||
},
|
||||
getSnapshot(): T {
|
||||
return state;
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -12030,11 +12030,6 @@ immediate@~3.0.5:
|
||||
resolved "https://registry.yarnpkg.com/immediate/-/immediate-3.0.6.tgz#9db1dbd0faf8de6fbe0f5dd5e56bb606280de69b"
|
||||
integrity sha512-XXOFtyqDjNDAQxVfYxuF7g9Il/IbWmmlQg2MYKOH8ExIT1qg6xc4zyS3HaEEATgs1btfzxq15ciUiY7gjSXRGQ==
|
||||
|
||||
immer@11.1.3:
|
||||
version "11.1.3"
|
||||
resolved "https://registry.yarnpkg.com/immer/-/immer-11.1.3.tgz#78681e1deb6cec39753acf04eb16d7576c04f4d6"
|
||||
integrity sha512-6jQTc5z0KJFtr1UgFpIL3N9XSC3saRaI9PwWtzM2pSqkNGtiNkYY2OSwkOGDK2XcTRcLb1pi/aNkKZz0nxVH4Q==
|
||||
|
||||
immer@^9.0.6:
|
||||
version "9.0.21"
|
||||
resolved "https://registry.yarnpkg.com/immer/-/immer-9.0.21.tgz#1e025ea31a40f24fb064f1fef23e931496330176"
|
||||
|
||||
@@ -87,7 +87,7 @@ func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetype
|
||||
}
|
||||
|
||||
func (m *module) listPromotedPaths(ctx context.Context) ([]string, error) {
|
||||
paths, err := m.metadataStore.ListPromotedPaths(ctx)
|
||||
paths, err := m.metadataStore.GetPromotedPaths(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -142,7 +142,7 @@ func (m *module) PromoteAndIndexPaths(
|
||||
pathsStr = append(pathsStr, path.Path)
|
||||
}
|
||||
|
||||
existingPromotedPaths, err := m.metadataStore.ListPromotedPaths(ctx, pathsStr...)
|
||||
existingPromotedPaths, err := m.metadataStore.GetPromotedPaths(ctx, pathsStr...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz-otel-collector/utils"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
@@ -112,7 +111,7 @@ func (t *telemetryMetaStore) buildBodyJSONPaths(ctx context.Context,
|
||||
}
|
||||
|
||||
for _, fieldKey := range fieldKeys {
|
||||
fieldKey.Materialized = promoted.Contains(fieldKey.Name)
|
||||
fieldKey.Materialized = promoted[fieldKey.Name]
|
||||
fieldKey.Indexes = indexes[fieldKey.Name]
|
||||
}
|
||||
|
||||
@@ -294,33 +293,6 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
|
||||
return indexes, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) ListPromotedPaths(ctx context.Context, paths ...string) (map[string]struct{}, error) {
|
||||
sb := sqlbuilder.Select("path").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
||||
pathConditions := []string{}
|
||||
for _, path := range paths {
|
||||
pathConditions = append(pathConditions, sb.Equal("path", path))
|
||||
}
|
||||
sb.Where(sb.Or(pathConditions...))
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadPromotedPaths, "failed to load promoted paths")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
next := make(map[string]struct{})
|
||||
for rows.Next() {
|
||||
var path string
|
||||
if err := rows.Scan(&path); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadPromotedPaths, "failed to scan promoted path")
|
||||
}
|
||||
next[path] = struct{}{}
|
||||
}
|
||||
|
||||
return next, nil
|
||||
}
|
||||
|
||||
// TODO(Piyush): Remove this if not used in future
|
||||
func (t *telemetryMetaStore) ListJSONValues(ctx context.Context, path string, limit int) (*telemetrytypes.TelemetryFieldValues, bool, error) {
|
||||
path = CleanPathPrefixes(path)
|
||||
@@ -483,11 +455,12 @@ func derefValue(v any) any {
|
||||
return val.Interface()
|
||||
}
|
||||
|
||||
// IsPathPromoted checks if a specific path is promoted
|
||||
// IsPathPromoted checks if a specific path is promoted (Column Evolution table: field_name for logs body).
|
||||
func (t *telemetryMetaStore) IsPathPromoted(ctx context.Context, path string) (bool, error) {
|
||||
split := strings.Split(path, telemetrytypes.ArraySep)
|
||||
query := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE path = ? LIMIT 1", DBName, PromotedPathsTableName)
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, split[0])
|
||||
pathSegment := split[0]
|
||||
query := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE signal = ? AND column_name = ? AND field_context = ? AND field_name = ? LIMIT 1", DBName, PromotedPathsTableName)
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, telemetrytypes.SignalLogs, telemetrylogs.LogsV2BodyPromotedColumn, telemetrytypes.FieldContextBody, pathSegment)
|
||||
if err != nil {
|
||||
return false, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to check if path %s is promoted", path)
|
||||
}
|
||||
@@ -496,14 +469,23 @@ func (t *telemetryMetaStore) IsPathPromoted(ctx context.Context, path string) (b
|
||||
return rows.Next(), nil
|
||||
}
|
||||
|
||||
// GetPromotedPaths checks if a specific path is promoted
|
||||
func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...string) (*utils.ConcurrentSet[string], error) {
|
||||
sb := sqlbuilder.Select("path").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
||||
pathConditions := []string{}
|
||||
for _, path := range paths {
|
||||
pathConditions = append(pathConditions, sb.Equal("path", path))
|
||||
// GetPromotedPaths returns promoted paths from the Column Evolution table (field_name for logs body).
|
||||
func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error) {
|
||||
sb := sqlbuilder.Select("field_name").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
||||
conditions := []string{
|
||||
sb.Equal("signal", telemetrytypes.SignalLogs),
|
||||
sb.Equal("column_name", telemetrylogs.LogsV2BodyPromotedColumn),
|
||||
sb.Equal("field_context", telemetrytypes.FieldContextBody),
|
||||
sb.NotEqual("field_name", "__all__"),
|
||||
}
|
||||
sb.Where(sb.Or(pathConditions...))
|
||||
if len(paths) > 0 {
|
||||
pathArgs := make([]interface{}, len(paths))
|
||||
for i, p := range paths {
|
||||
pathArgs[i] = p
|
||||
}
|
||||
conditions = append(conditions, sb.In("field_name", pathArgs))
|
||||
}
|
||||
sb.Where(sb.And(conditions...))
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
@@ -512,13 +494,13 @@ func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...stri
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
promotedPaths := utils.NewConcurrentSet[string]()
|
||||
promotedPaths := make(map[string]bool)
|
||||
for rows.Next() {
|
||||
var path string
|
||||
if err := rows.Scan(&path); err != nil {
|
||||
var fieldName string
|
||||
if err := rows.Scan(&fieldName); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to scan promoted path")
|
||||
}
|
||||
promotedPaths.Insert(path)
|
||||
promotedPaths[fieldName] = true
|
||||
}
|
||||
|
||||
return promotedPaths, nil
|
||||
@@ -532,21 +514,22 @@ func CleanPathPrefixes(path string) string {
|
||||
return path
|
||||
}
|
||||
|
||||
// PromotePaths inserts promoted paths into the Column Evolution table (same schema as signoz-otel-collector metadata_migrations).
|
||||
func (t *telemetryMetaStore) PromotePaths(ctx context.Context, paths ...string) error {
|
||||
batch, err := t.telemetrystore.ClickhouseDB().PrepareBatch(ctx,
|
||||
fmt.Sprintf("INSERT INTO %s.%s (path, created_at) VALUES", DBName,
|
||||
fmt.Sprintf("INSERT INTO %s.%s (signal, column_name, column_type, field_context, field_name, version, release_time) VALUES", DBName,
|
||||
PromotedPathsTableName))
|
||||
if err != nil {
|
||||
return errors.WrapInternalf(err, CodeFailedToPrepareBatch, "failed to prepare batch")
|
||||
}
|
||||
|
||||
nowMs := uint64(time.Now().UnixMilli())
|
||||
releaseTime := time.Now().UnixNano()
|
||||
for _, p := range paths {
|
||||
trimmed := strings.TrimSpace(p)
|
||||
if trimmed == "" {
|
||||
continue
|
||||
}
|
||||
if err := batch.Append(trimmed, nowMs); err != nil {
|
||||
if err := batch.Append(telemetrytypes.SignalLogs, telemetrylogs.LogsV2BodyPromotedColumn, "JSON()", telemetrytypes.FieldContextBody, trimmed, 0, releaseTime); err != nil {
|
||||
_ = batch.Abort()
|
||||
return errors.WrapInternalf(err, CodeFailedToAppendPath, "failed to append path")
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ const (
|
||||
AttributesMetadataTableName = "distributed_attributes_metadata"
|
||||
AttributesMetadataLocalTableName = "attributes_metadata"
|
||||
PathTypesTableName = otelcollectorconst.DistributedPathTypesTable
|
||||
PromotedPathsTableName = otelcollectorconst.DistributedPromotedPathsTable
|
||||
// Column Evolution table stores promoted paths as (signal, column_name, field_context, field_name); see signoz-otel-collector metadata_migrations.
|
||||
PromotedPathsTableName = "distributed_column_evolution_metadata"
|
||||
SkipIndexTableName = "system.data_skipping_indices"
|
||||
)
|
||||
|
||||
@@ -134,12 +134,6 @@ var (
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
"timestamp": {
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
}
|
||||
|
||||
CalculatedFields = map[string]telemetrytypes.TelemetryFieldKey{
|
||||
|
||||
@@ -188,11 +188,6 @@ func (m *defaultFieldMapper) getColumn(
|
||||
return indexV3Columns["attributes_bool"], nil
|
||||
}
|
||||
case telemetrytypes.FieldContextSpan, telemetrytypes.FieldContextUnspecified:
|
||||
/*
|
||||
TODO: This is incorrect, we cannot assume all unspecified context fields are span context.
|
||||
User could be referring to attributes, but we cannot fix this until we fix where_clause vistior
|
||||
https://github.com/SigNoz/signoz/pull/10102
|
||||
*/
|
||||
// Check if this is a span scope field
|
||||
if strings.ToLower(key.Name) == SpanSearchScopeRoot || strings.ToLower(key.Name) == SpanSearchScopeEntryPoint {
|
||||
// The actual SQL will be generated in the condition builder
|
||||
@@ -201,28 +196,20 @@ func (m *defaultFieldMapper) getColumn(
|
||||
|
||||
// TODO(srikanthccv): remove this when it's safe to remove
|
||||
// issue with CH aliasing
|
||||
|
||||
/*
|
||||
NOTE: There are fields which are deprecated for only to not show up as user suggestion and is possible that
|
||||
they don't have a mapping in oldToNew map. So we need to look up in indexV3Columns directly for those fields.
|
||||
For example: kind, timestamp etc.
|
||||
*/
|
||||
if _, ok := CalculatedFieldsDeprecated[key.Name]; ok {
|
||||
// Check if we have a mapping for the deprecated calculated field
|
||||
if col, ok := indexV3Columns[oldToNew[key.Name]]; ok {
|
||||
return col, nil
|
||||
}
|
||||
return indexV3Columns[oldToNew[key.Name]], nil
|
||||
}
|
||||
if _, ok := IntrinsicFieldsDeprecated[key.Name]; ok {
|
||||
// Check if we have a mapping for the deprecated intrinsic field
|
||||
if col, ok := indexV3Columns[oldToNew[key.Name]]; ok {
|
||||
return col, nil
|
||||
if _, ok := indexV3Columns[oldToNew[key.Name]]; ok {
|
||||
return indexV3Columns[oldToNew[key.Name]], nil
|
||||
}
|
||||
}
|
||||
|
||||
if col, ok := indexV3Columns[key.Name]; ok {
|
||||
return col, nil
|
||||
}
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
@@ -74,41 +74,6 @@ func (b *traceQueryStatementBuilder) Build(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
/*
|
||||
Adding a tech debt note here:
|
||||
This piece of code is a hot fix and should be removed once we close issue: engineering-pod/issues/3622
|
||||
*/
|
||||
/*
|
||||
-------------------------------- Start of tech debt ----------------------------
|
||||
*/
|
||||
if requestType == qbtypes.RequestTypeRaw {
|
||||
|
||||
selectedFields := query.SelectFields
|
||||
|
||||
if len(selectedFields) == 0 {
|
||||
sortedKeys := maps.Keys(DefaultFields)
|
||||
slices.Sort(sortedKeys)
|
||||
for _, key := range sortedKeys {
|
||||
selectedFields = append(selectedFields, DefaultFields[key])
|
||||
}
|
||||
query.SelectFields = selectedFields
|
||||
}
|
||||
|
||||
selectFieldKeys := []string{}
|
||||
for _, field := range selectedFields {
|
||||
selectFieldKeys = append(selectFieldKeys, field.Name)
|
||||
}
|
||||
|
||||
for _, x := range []string{"timestamp", "span_id", "trace_id"} {
|
||||
if !slices.Contains(selectFieldKeys, x) {
|
||||
query.SelectFields = append(query.SelectFields, DefaultFields[x])
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
-------------------------------- End of tech debt ----------------------------
|
||||
*/
|
||||
|
||||
query = b.adjustKeys(ctx, keys, query, requestType)
|
||||
|
||||
// Check if filter contains trace_id(s) and optimize time range if needed
|
||||
@@ -202,13 +167,19 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
|
||||
// 1. to not fail filter expression that use deprecated cols
|
||||
// 2. this could have been moved to metadata fetching itself, however, that
|
||||
// would mean, they also show up in suggestions we we don't want to do
|
||||
// 3. reason for not doing a simple append is to keep intrinsic/calculated field first so that it gets
|
||||
// priority in multi_if sql expression
|
||||
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
|
||||
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
|
||||
if _, ok := keys[fieldKeyName]; !ok {
|
||||
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
|
||||
} else {
|
||||
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
|
||||
}
|
||||
}
|
||||
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
|
||||
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
|
||||
if _, ok := keys[fieldKeyName]; !ok {
|
||||
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
|
||||
} else {
|
||||
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
|
||||
}
|
||||
}
|
||||
|
||||
// Adjust keys for alias expressions in aggregations
|
||||
@@ -311,8 +282,29 @@ func (b *traceQueryStatementBuilder) buildListQuery(
|
||||
cteArgs = append(cteArgs, args)
|
||||
}
|
||||
|
||||
selectedFields := query.SelectFields
|
||||
|
||||
if len(selectedFields) == 0 {
|
||||
sortedKeys := maps.Keys(DefaultFields)
|
||||
slices.Sort(sortedKeys)
|
||||
for _, key := range sortedKeys {
|
||||
selectedFields = append(selectedFields, DefaultFields[key])
|
||||
}
|
||||
}
|
||||
|
||||
selectFieldKeys := []string{}
|
||||
for _, field := range selectedFields {
|
||||
selectFieldKeys = append(selectFieldKeys, field.Name)
|
||||
}
|
||||
|
||||
for _, x := range []string{"timestamp", "span_id", "trace_id"} {
|
||||
if !slices.Contains(selectFieldKeys, x) {
|
||||
selectedFields = append(selectedFields, DefaultFields[x])
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs?
|
||||
for _, field := range query.SelectFields {
|
||||
for _, field := range selectedFields {
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -483,7 +483,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -698,113 +698,6 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
|
||||
keysMap map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
expected qbtypes.Statement
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "List query with empty select fields",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
keysMap: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"timestamp": {
|
||||
{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Filter: &qbtypes.Filter{},
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "List query with empty select fields and order by timestamp",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
keysMap: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"timestamp": {
|
||||
{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Filter: &qbtypes.Filter{},
|
||||
Limit: 10,
|
||||
Order: []qbtypes.OrderBy{{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
}},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = c.keysMap
|
||||
if mockMetadataStore.KeysMap == nil {
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
}
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
statementBuilder := NewTraceQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
resourceFilterStmtBuilder,
|
||||
aggExprRewriter,
|
||||
nil,
|
||||
)
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), c.expectedErr.Error())
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected.Query, q.Query)
|
||||
require.Equal(t, c.expected.Args, q.Args)
|
||||
require.Equal(t, c.expected.Warnings, q.Warnings)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
|
||||
@@ -36,7 +36,7 @@ type MetadataStore interface {
|
||||
ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error)
|
||||
|
||||
// ListPromotedPaths lists the promoted paths.
|
||||
ListPromotedPaths(ctx context.Context, paths ...string) (map[string]struct{}, error)
|
||||
GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error)
|
||||
|
||||
// PromotePaths promotes the paths.
|
||||
PromotePaths(ctx context.Context, paths ...string) error
|
||||
|
||||
@@ -16,7 +16,7 @@ type MockMetadataStore struct {
|
||||
RelatedValuesMap map[string][]string
|
||||
AllValuesMap map[string]*telemetrytypes.TelemetryFieldValues
|
||||
TemporalityMap map[string]metrictypes.Temporality
|
||||
PromotedPathsMap map[string]struct{}
|
||||
PromotedPathsMap map[string]bool
|
||||
LogsJSONIndexesMap map[string][]schemamigrator.Index
|
||||
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
|
||||
}
|
||||
@@ -28,7 +28,7 @@ func NewMockMetadataStore() *MockMetadataStore {
|
||||
RelatedValuesMap: make(map[string][]string),
|
||||
AllValuesMap: make(map[string]*telemetrytypes.TelemetryFieldValues),
|
||||
TemporalityMap: make(map[string]metrictypes.Temporality),
|
||||
PromotedPathsMap: make(map[string]struct{}),
|
||||
PromotedPathsMap: make(map[string]bool),
|
||||
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
|
||||
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
}
|
||||
@@ -295,13 +295,13 @@ func (m *MockMetadataStore) SetTemporality(metricName string, temporality metric
|
||||
// PromotePaths promotes the paths.
|
||||
func (m *MockMetadataStore) PromotePaths(ctx context.Context, paths ...string) error {
|
||||
for _, path := range paths {
|
||||
m.PromotedPathsMap[path] = struct{}{}
|
||||
m.PromotedPathsMap[path] = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListPromotedPaths lists the promoted paths.
|
||||
func (m *MockMetadataStore) ListPromotedPaths(ctx context.Context, paths ...string) (map[string]struct{}, error) {
|
||||
// GetPromotedPaths returns the promoted paths.
|
||||
func (m *MockMetadataStore) GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error) {
|
||||
return m.PromotedPathsMap, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Callable, Dict, List
|
||||
from typing import Callable, Dict, List
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -14,11 +14,7 @@ from fixtures.querier import (
|
||||
make_query_request,
|
||||
)
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
from src.querier.util import (
|
||||
assert_identical_query_response,
|
||||
format_timestamp,
|
||||
generate_traces_with_corrupt_metadata,
|
||||
)
|
||||
from src.querier.util import assert_identical_query_response
|
||||
|
||||
|
||||
def test_traces_list(
|
||||
@@ -479,234 +475,6 @@ def test_traces_list(
|
||||
assert set(values) == set(["topic-service", "http-service"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"payload,status_code,results",
|
||||
[
|
||||
# Case 1: order by timestamp field which there in attributes as well
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"order": [{"key": {"name": "timestamp"}, "direction": "desc"}],
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[3].duration_nano,
|
||||
x[3].name,
|
||||
x[3].response_status_code,
|
||||
x[3].service_name,
|
||||
x[3].span_id,
|
||||
format_timestamp(x[3].timestamp),
|
||||
x[3].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 2: order by attribute timestamp field which is there in attributes as well
|
||||
# This should break but it doesn't because attribute.timestamp gets adjusted to timestamp
|
||||
# because of default trace.timestamp gets added by default and bug in field mapper picks
|
||||
# instrinsic field
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"order": [
|
||||
{"key": {"name": "attribute.timestamp"}, "direction": "desc"}
|
||||
],
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[3].duration_nano,
|
||||
x[3].name,
|
||||
x[3].response_status_code,
|
||||
x[3].service_name,
|
||||
x[3].span_id,
|
||||
format_timestamp(x[3].timestamp),
|
||||
x[3].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 3: select timestamp with empty order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "timestamp"}],
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[2].span_id,
|
||||
format_timestamp(x[2].timestamp),
|
||||
x[2].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 4: select attribute.timestamp with empty order by
|
||||
# This doesn't return any data because of where_clause using aliased timestamp
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": "attribute.timestamp exists"},
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "attribute.timestamp"}],
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 5: select timestamp with timestamp order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "timestamp"}],
|
||||
"limit": 1,
|
||||
"order": [{"key": {"name": "timestamp"}, "direction": "asc"}],
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[0].span_id,
|
||||
format_timestamp(x[0].timestamp),
|
||||
x[0].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 6: select duration_nano with duration order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "duration_nano"}],
|
||||
"limit": 1,
|
||||
"order": [{"key": {"name": "duration_nano"}, "direction": "desc"}],
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[1].duration_nano,
|
||||
x[1].span_id,
|
||||
format_timestamp(x[1].timestamp),
|
||||
x[1].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 7: select attribute.duration_nano with attribute.duration_nano order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "attribute.duration_nano"}],
|
||||
"filter": {"expression": "attribute.duration_nano exists"},
|
||||
"limit": 1,
|
||||
"order": [
|
||||
{
|
||||
"key": {"name": "attribute.duration_nano"},
|
||||
"direction": "desc",
|
||||
}
|
||||
],
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
"corrupt_data",
|
||||
x[3].span_id,
|
||||
format_timestamp(x[3].timestamp),
|
||||
x[3].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 8: select attribute.duration_nano with duration order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "attribute.duration_nano"}],
|
||||
"limit": 1,
|
||||
"order": [{"key": {"name": "duration_nano"}, "direction": "desc"}],
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[1].duration_nano,
|
||||
x[1].span_id,
|
||||
format_timestamp(x[1].timestamp),
|
||||
x[1].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_traces_list_with_corrupt_data(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
payload: Dict[str, Any],
|
||||
status_code: HTTPStatus,
|
||||
results: Callable[[List[Traces]], List[Any]],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 4 traces with corrupt attributes.
|
||||
Tests:
|
||||
"""
|
||||
|
||||
traces = generate_traces_with_corrupt_metadata()
|
||||
insert_traces(traces)
|
||||
# 4 Traces with corrupt metadata inserted
|
||||
# traces[i] occured before traces[j] where i < j
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int(
|
||||
(datetime.now(tz=timezone.utc) - timedelta(minutes=5)).timestamp() * 1000
|
||||
),
|
||||
end_ms=int(datetime.now(tz=timezone.utc).timestamp() * 1000),
|
||||
request_type="raw",
|
||||
queries=[payload],
|
||||
)
|
||||
|
||||
assert response.status_code == status_code
|
||||
|
||||
if response.status_code == HTTPStatus.OK:
|
||||
|
||||
if not results(traces):
|
||||
# No results expected
|
||||
assert response.json()["data"]["data"]["results"][0]["rows"] is None
|
||||
else:
|
||||
data = response.json()["data"]["data"]["results"][0]["rows"][0]["data"]
|
||||
# Cannot compare values as they are randomly generated
|
||||
for key, value in zip(list(data.keys()), results(traces)):
|
||||
assert data[key] == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_by,aggregation_alias,expected_status",
|
||||
[
|
||||
|
||||
@@ -1,25 +1,7 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import List
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
|
||||
|
||||
def format_timestamp(dt: datetime) -> str:
|
||||
"""
|
||||
Format a datetime object to match the API's timestamp format.
|
||||
The API returns timestamps with minimal fractional seconds precision.
|
||||
Example: 2026-02-03T20:54:56.5Z for 500000 microseconds
|
||||
"""
|
||||
base_str = dt.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
if dt.microsecond:
|
||||
# Convert microseconds to fractional seconds and strip trailing zeros
|
||||
fractional = f"{dt.microsecond / 1000000:.6f}"[2:].rstrip("0")
|
||||
return f"{base_str}.{fractional}Z"
|
||||
return f"{base_str}Z"
|
||||
|
||||
|
||||
def assert_identical_query_response(
|
||||
response1: requests.Response, response2: requests.Response
|
||||
@@ -36,125 +18,3 @@ def assert_identical_query_response(
|
||||
response1.json()["data"]["data"]["results"]
|
||||
== response2.json()["data"]["data"]["results"]
|
||||
), "Response data do not match"
|
||||
|
||||
|
||||
def generate_traces_with_corrupt_metadata() -> List[Traces]:
|
||||
"""
|
||||
Specifically, entries with 'id', 'timestamp', 'trace_id' and 'duration_nano' fields in metadata
|
||||
"""
|
||||
http_service_trace_id = TraceIdGenerator.trace_id()
|
||||
http_service_span_id = TraceIdGenerator.span_id()
|
||||
http_service_db_span_id = TraceIdGenerator.span_id()
|
||||
http_service_patch_span_id = TraceIdGenerator.span_id()
|
||||
topic_service_trace_id = TraceIdGenerator.trace_id()
|
||||
topic_service_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
return [
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /integration",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"trace_id": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"net.transport": "IP.TCP",
|
||||
"http.scheme": "http",
|
||||
"http.user_agent": "Integration Test",
|
||||
"http.request.method": "POST",
|
||||
"http.response.status_code": "200",
|
||||
"timestamp": "corrupt_data",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=3.5),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_db_span_id,
|
||||
parent_span_id=http_service_span_id,
|
||||
name="SELECT",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"timestamp": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"db.name": "integration",
|
||||
"db.operation": "SELECT",
|
||||
"db.statement": "SELECT * FROM integration",
|
||||
"trace_d": "corrupt_data",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=3),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_patch_span_id,
|
||||
parent_span_id=http_service_span_id,
|
||||
name="HTTP PATCH",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"duration_nano": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"http.request.method": "PATCH",
|
||||
"http.status_code": "404",
|
||||
"id": "1",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
duration=timedelta(seconds=4),
|
||||
trace_id=topic_service_trace_id,
|
||||
span_id=topic_service_span_id,
|
||||
parent_span_id="",
|
||||
name="topic publish",
|
||||
kind=TracesKind.SPAN_KIND_PRODUCER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "topic-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-001",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "001",
|
||||
},
|
||||
attributes={
|
||||
"message.type": "SENT",
|
||||
"messaging.operation": "publish",
|
||||
"messaging.message.id": "001",
|
||||
"duration_nano": "corrupt_data",
|
||||
"id": 1,
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user