Compare commits

..

17 Commits

Author SHA1 Message Date
Abhi Kumar
a17b617424 fix: highlighting first series 2026-04-24 19:50:21 +05:30
Abhi Kumar
810f4005bc fix: fixed other tooltips closing when clicked on top of root tooltip 2026-04-24 18:40:25 +05:30
Abhi Kumar
5edf86acae feat: added changes for syncing tooltip 2026-04-24 18:36:49 +05:30
Abhi Kumar
a2479382c6 Merge branch 'main' of https://github.com/SigNoz/signoz into feat/crosshair-series-highlight 2026-04-24 16:06:16 +05:30
Abhi Kumar
681809d8c1 chore: minor changes 2026-04-22 22:50:43 +05:30
Abhi Kumar
a8822ae2d4 chore: handled other cases of groupby 2026-04-22 22:44:34 +05:30
Abhi Kumar
c2e9ca7c68 Merge branch 'main' of https://github.com/SigNoz/signoz into feat/crosshair-series-highlight 2026-04-22 22:16:12 +05:30
Abhi Kumar
032a2dc458 Merge branch 'main' of https://github.com/SigNoz/signoz into feat/crosshair-series-highlight 2026-04-22 11:14:24 +05:30
Abhi Kumar
16cc7b8ab9 chore: pr review fixes 2026-04-22 11:13:43 +05:30
Abhi Kumar
95e57d90a5 Merge branch 'main' of https://github.com/SigNoz/signoz into feat/crosshair-series-highlight 2026-04-21 15:30:53 +05:30
Abhi Kumar
b9bca0f9af feat: added changes for sereis highlighting on crosshair sync 2026-04-20 17:46:07 +05:30
Abhi Kumar
ee87a70a4c chore: minor cleanup 2026-04-20 15:54:30 +05:30
Abhi Kumar
d5f4f50e26 chore: updated the types 2026-04-20 13:18:04 +05:30
Abhi Kumar
f8240f4d20 chore: updated the core structure 2026-04-19 23:33:56 +05:30
Abhi Kumar
241d70ca69 Merge branch 'main' of https://github.com/SigNoz/signoz into chore/tooltip-sync-fix 2026-04-19 19:17:34 +05:30
Abhi Kumar
8e1916daa6 chore: minor cleanup 2026-04-16 00:53:40 +05:30
Abhi Kumar
7eb8806c0f chore: added changes for crosshair sync for tooltip 2026-04-16 00:50:16 +05:30
41 changed files with 428 additions and 2503 deletions

View File

@@ -51,7 +51,6 @@ jobs:
- role
- rootuser
- serviceaccount
- querier_json_body
- ttl
sqlstore-provider:
- postgres
@@ -62,7 +61,7 @@ jobs:
- 25.5.6
- 25.12.5
schema-migrator-version:
- v0.144.3
- v0.142.0
postgres-version:
- 15
if: |

View File

@@ -2926,8 +2926,8 @@ components:
type: object
PromotetypesWrappedIndex:
properties:
fieldDataType:
$ref: '#/components/schemas/TelemetrytypesFieldDataType'
column_type:
type: string
granularity:
type: integer
type:

View File

@@ -3696,7 +3696,10 @@ export interface PromotetypesPromotePathDTO {
}
export interface PromotetypesWrappedIndexDTO {
fieldDataType?: TelemetrytypesFieldDataTypeDTO;
/**
* @type string
*/
column_type?: string;
/**
* @type integer
*/

View File

@@ -33,6 +33,7 @@ export default function ChartWrapper({
children,
layoutChildren,
yAxisUnit,
groupBy,
customTooltip,
pinnedTooltipElement,
'data-testid': testId,
@@ -68,8 +69,9 @@ export default function ChartWrapper({
const syncMetadata = useMemo(
() => ({
yAxisUnit,
groupBy,
}),
[yAxisUnit],
[yAxisUnit, groupBy],
);
return (

View File

@@ -6,6 +6,7 @@ import {
DashboardCursorSync,
TooltipClickData,
} from 'lib/uPlotV2/plugins/TooltipPlugin/types';
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
interface BaseChartProps {
width: number;
@@ -38,6 +39,7 @@ interface UPlotBasedChartProps {
interface UPlotChartDataProps {
yAxisUnit?: string;
decimalPrecision?: PrecisionOption;
groupBy?: BaseAutocompleteData[];
}
export interface TimeSeriesChartProps

View File

@@ -113,6 +113,10 @@ function BarPanel(props: PanelWrapperProps): JSX.Element {
uPlotRef.current = plot;
}, []);
const groupBy = useMemo(() => {
return widget.query.builder.queryData[0].groupBy;
}, [widget.query]);
return (
<div className="panel-container" ref={graphRef}>
{containerDimensions.width > 0 && containerDimensions.height > 0 && (
@@ -128,6 +132,7 @@ function BarPanel(props: PanelWrapperProps): JSX.Element {
width={containerDimensions.width}
height={containerDimensions.height}
layoutChildren={layoutChildren}
groupBy={groupBy}
isStackedBarChart={widget.stackedBarChart ?? false}
yAxisUnit={widget.yAxisUnit}
decimalPrecision={widget.decimalPrecision}

View File

@@ -105,6 +105,7 @@ export function prepareBarPanelConfig({
colorMapping: widget.customLegendColors ?? {},
isDarkMode,
stepInterval: currentStepInterval,
metric: series.metric,
});
});

View File

@@ -104,6 +104,10 @@ function TimeSeriesPanel(props: PanelWrapperProps): JSX.Element {
widget.decimalPrecision,
]);
const groupBy = useMemo(() => {
return widget.query.builder.queryData[0].groupBy;
}, [widget.query]);
return (
<div className="panel-container" ref={graphRef}>
{containerDimensions.width > 0 && containerDimensions.height > 0 && (
@@ -117,6 +121,7 @@ function TimeSeriesPanel(props: PanelWrapperProps): JSX.Element {
yAxisUnit={widget.yAxisUnit}
decimalPrecision={widget.decimalPrecision}
data={chartData as uPlot.AlignedData}
groupBy={groupBy}
width={containerDimensions.width}
height={containerDimensions.height}
layoutChildren={layoutChildren}

View File

@@ -131,6 +131,7 @@ export const prepareUPlotConfig = ({
pointSize: 5,
fillMode: widget.fillMode || FillMode.None,
isDarkMode,
metric: series.metric,
});
});

View File

@@ -16,6 +16,7 @@ export default function BarChartTooltip(props: BarTooltipProps): JSX.Element {
yAxisUnit: props.yAxisUnit ?? '',
decimalPrecision: props.decimalPrecision,
isStackedBarChart: props.isStackedBarChart,
syncedSeriesIndexes: props.syncedSeriesIndexes,
}),
[
props.uPlotInstance,
@@ -24,6 +25,7 @@ export default function BarChartTooltip(props: BarTooltipProps): JSX.Element {
props.yAxisUnit,
props.decimalPrecision,
props.isStackedBarChart,
props.syncedSeriesIndexes,
],
);

View File

@@ -17,6 +17,7 @@ export default function HistogramTooltip(
uPlotInstance: props.uPlotInstance,
yAxisUnit: props.yAxisUnit ?? '',
decimalPrecision: props.decimalPrecision,
syncedSeriesIndexes: props.syncedSeriesIndexes,
}),
[
props.uPlotInstance,
@@ -24,6 +25,7 @@ export default function HistogramTooltip(
props.dataIndexes,
props.yAxisUnit,
props.decimalPrecision,
props.syncedSeriesIndexes,
],
);

View File

@@ -17,6 +17,7 @@ export default function TimeSeriesTooltip(
uPlotInstance: props.uPlotInstance,
yAxisUnit: props.yAxisUnit ?? '',
decimalPrecision: props.decimalPrecision,
syncedSeriesIndexes: props.syncedSeriesIndexes,
}),
[
props.uPlotInstance,
@@ -24,6 +25,7 @@ export default function TimeSeriesTooltip(
props.dataIndexes,
props.yAxisUnit,
props.decimalPrecision,
props.syncedSeriesIndexes,
],
);

View File

@@ -62,6 +62,7 @@ export function buildTooltipContent({
yAxisUnit,
decimalPrecision,
isStackedBarChart,
syncedSeriesIndexes,
}: {
data: AlignedData;
series: Series[];
@@ -71,18 +72,34 @@ export function buildTooltipContent({
yAxisUnit: string;
decimalPrecision?: PrecisionOption;
isStackedBarChart?: boolean;
syncedSeriesIndexes?: number[] | null;
}): TooltipContentItem[] {
const items: TooltipContentItem[] = [];
const allowedIndexes =
syncedSeriesIndexes != null ? new Set(syncedSeriesIndexes) : null;
for (let seriesIndex = 1; seriesIndex < series.length; seriesIndex += 1) {
const seriesItem = series[seriesIndex];
if (!seriesItem?.show) {
continue;
}
if (allowedIndexes != null && !allowedIndexes.has(seriesIndex)) {
continue;
}
const dataIndex = dataIndexes[seriesIndex];
// Skip series with no data at the current cursor position
const isSync = allowedIndexes != null;
if (dataIndex === null) {
if (isSync) {
items.push({
label: String(seriesItem.label ?? ''),
value: 0,
tooltipValue: 'No Data',
color: resolveSeriesColor(seriesItem.stroke, uPlotInstance, seriesIndex),
isActive: false,
});
}
continue;
}
@@ -102,6 +119,14 @@ export function buildTooltipContent({
color: resolveSeriesColor(seriesItem.stroke, uPlotInstance, seriesIndex),
isActive: seriesIndex === activeSeriesIndex,
});
} else if (isSync) {
items.push({
label: String(seriesItem.label ?? ''),
value: 0,
tooltipValue: 'No Data',
color: resolveSeriesColor(seriesItem.stroke, uPlotInstance, seriesIndex),
isActive: false,
});
}
}

View File

@@ -58,6 +58,9 @@ export interface TooltipRenderArgs {
isPinned: boolean;
dismiss: () => void;
viaSync: boolean;
/** In Tooltip sync mode, limits which series are rendered in the receiver tooltip.
* null = no filtering; [] = no matches (tooltip hidden upstream); [...] = allowed indexes */
syncedSeriesIndexes?: number[] | null;
}
export interface BaseTooltipProps {

View File

@@ -9,6 +9,7 @@ import {
BarAlignment,
ConfigBuilder,
DrawStyle,
ExtendedSeries,
FillMode,
LineInterpolation,
LineStyle,
@@ -27,7 +28,10 @@ let builders: PathBuilders | null = null;
const DEFAULT_LINE_WIDTH = 2;
export const POINT_SIZE_FACTOR = 2.5;
export class UPlotSeriesBuilder extends ConfigBuilder<SeriesProps, Series> {
export class UPlotSeriesBuilder extends ConfigBuilder<
SeriesProps,
ExtendedSeries
> {
constructor(props: SeriesProps) {
super(props);
const pathBuilders = uPlot.paths;
@@ -205,8 +209,8 @@ export class UPlotSeriesBuilder extends ConfigBuilder<SeriesProps, Series> {
);
}
getConfig(): Series {
const { scaleKey, label, spanGaps, show = true } = this.props;
getConfig(): ExtendedSeries {
const { scaleKey, label, spanGaps, show = true, metric } = this.props;
const resolvedLineColor = this.getLineColor();
@@ -233,6 +237,7 @@ export class UPlotSeriesBuilder extends ConfigBuilder<SeriesProps, Series> {
...lineConfig,
...pathConfig,
points: Object.keys(pointsConfig).length > 0 ? pointsConfig : undefined,
metric,
};
}
}

View File

@@ -171,6 +171,10 @@ export enum FillMode {
None = 'none',
}
export type ExtendedSeries = Series & {
metric?: { [key: string]: string };
};
export interface SeriesProps extends LineConfig, PointsConfig, BarConfig {
scaleKey: string;
label?: string;
@@ -194,6 +198,7 @@ export interface SeriesProps extends LineConfig, PointsConfig, BarConfig {
fillMode?: FillMode;
isDarkMode?: boolean;
stepInterval?: number;
metric?: { [key: string]: string };
}
export interface LegendItem {

View File

@@ -3,7 +3,7 @@ import { createPortal } from 'react-dom';
import cx from 'classnames';
import uPlot from 'uplot';
import { syncCursorRegistry } from './syncCursorRegistry';
import { createSyncDisplayHook } from './syncDisplayHook';
import {
createInitialControllerState,
createSetCursorHandler,
@@ -104,32 +104,16 @@ export default function TooltipPlugin({
// Enable uPlot's built-in cursor sync when requested so that
// crosshair / tooltip can follow the dashboard-wide cursor.
let removeSyncDisplayHook: (() => void) | null = null;
if (syncMode !== DashboardCursorSync.None && config.scales[0]?.props.time) {
config.setCursor({
sync: { key: syncKey, scales: ['x', 'y'] },
});
// Show the horizontal crosshair only when the receiving panel shares
// the same y-axis unit as the source panel. When this panel is the
// source (cursor.event != null) the line is always shown and this
// panel's metadata is written to the registry so receivers can read it.
config.addHook('setCursor', (u: uPlot): void => {
const yCursorEl = u.root.querySelector<HTMLElement>('.u-cursor-y');
if (!yCursorEl) {
return;
}
if (u.cursor.event != null) {
// This panel is the source — publish metadata and always show line.
syncCursorRegistry.setMetadata(syncKey, syncMetadata);
yCursorEl.style.display = '';
} else {
// This panel is receiving sync — show only if units match.
const sourceMeta = syncCursorRegistry.getMetadata(syncKey);
yCursorEl.style.display =
sourceMeta?.yAxisUnit === syncMetadata?.yAxisUnit ? '' : 'none';
}
});
removeSyncDisplayHook = config.addHook(
'setCursor',
createSyncDisplayHook(syncKey, syncMetadata, controller),
);
}
// Dismiss the tooltip when the user clicks / presses a key
@@ -137,7 +121,12 @@ export default function TooltipPlugin({
const onOutsideInteraction = (event: Event): void => {
const target = event.target as Node;
if (!containerRef.current?.contains(target)) {
dismissTooltip();
// Don't dismiss if the click landed inside any other pinned tooltip.
const isInsideAnyPinnedTooltip =
(target as Element).closest?.('[data-pinned="true"]') != null;
if (!isInsideAnyPinnedTooltip) {
dismissTooltip();
}
}
};
@@ -156,7 +145,7 @@ export default function TooltipPlugin({
function updateCursorLock(): void {
const plot = getPlot(controller);
if (plot) {
// @ts-ignore uPlot cursor lock is not working as expected
// @ts-expect-error uPlot cursor lock is not working as expected
plot.cursor._lock = controller.pinned;
}
}
@@ -203,6 +192,16 @@ export default function TooltipPlugin({
if (!controller.hoverActive || !plot) {
return null;
}
// In Tooltip sync mode, suppress the receiver tooltip entirely when
// no receiver series match the source panel's focused series.
if (
syncTooltipWithDashboard &&
controller.cursorDrivenBySync &&
Array.isArray(controller.syncedSeriesIndexes) &&
controller.syncedSeriesIndexes.length === 0
) {
return null;
}
return renderRef.current({
uPlotInstance: plot,
dataIndexes: controller.seriesIndexes,
@@ -210,6 +209,7 @@ export default function TooltipPlugin({
isPinned: controller.pinned,
dismiss: dismissTooltip,
viaSync: controller.cursorDrivenBySync,
syncedSeriesIndexes: controller.syncedSeriesIndexes,
});
}
@@ -431,6 +431,7 @@ export default function TooltipPlugin({
removeSetSeriesHook();
removeSetLegendHook();
removeSetCursorHook();
removeSyncDisplayHook?.();
if (overClickHandler) {
const plot = getPlot(controller);
plot?.over.removeEventListener('click', overClickHandler);
@@ -493,7 +494,7 @@ export default function TooltipPlugin({
isHovering,
contents,
]);
const isTooltipVisible = isHovering || tooltipBody != null;
const isTooltipVisible = tooltipBody != null;
if (!hasPlot) {
return null;

View File

@@ -9,9 +9,13 @@ import type { TooltipSyncMetadata } from './types';
*
* Receivers use this to make decisions such as:
* - Whether to show the horizontal crosshair line (matching yAxisUnit)
* - Future: what to render inside the tooltip (matching groupBy, etc.)
* - Which series to highlight when panels share the same groupBy
*/
const metadataBySyncKey = new Map<string, TooltipSyncMetadata | undefined>();
const activeSeriesMetricBySyncKey = new Map<
string,
Record<string, string> | null
>();
export const syncCursorRegistry = {
setMetadata(syncKey: string, metadata: TooltipSyncMetadata | undefined): void {
@@ -21,4 +25,15 @@ export const syncCursorRegistry = {
getMetadata(syncKey: string): TooltipSyncMetadata | undefined {
return metadataBySyncKey.get(syncKey);
},
setActiveSeriesMetric(
syncKey: string,
metric: Record<string, string> | null,
): void {
activeSeriesMetricBySyncKey.set(syncKey, metric);
},
getActiveSeriesMetric(syncKey: string): Record<string, string> | null {
return activeSeriesMetricBySyncKey.get(syncKey) ?? null;
},
};

View File

@@ -0,0 +1,175 @@
import uPlot from 'uplot';
import type { ExtendedSeries } from '../../config/types';
import { syncCursorRegistry } from './syncCursorRegistry';
import type { TooltipControllerState, TooltipSyncMetadata } from './types';
/**
* Returns the dimension keys present in both groupBy arrays.
* An empty result means no overlap — series highlighting should not run.
*
* exact [A, B] vs [A, B] → [A, B] one match
* subset [A] vs [A, B] → [A] multiple receiver series may match
* superset [A, B] vs [A] → [A] one receiver series matches
* partial [A, B] vs [B, C] → [B]
*/
function getCommonGroupByKeys(
a: TooltipSyncMetadata['groupBy'],
b: TooltipSyncMetadata['groupBy'],
): string[] {
if (!Array.isArray(a) || a.length === 0 || !Array.isArray(b) || b.length === 0) {
return [];
}
const bKeys = new Set(b.map((g) => g.key));
return a.filter((g) => bKeys.has(g.key)).map((g) => g.key);
}
/**
* Returns the 1-based indexes of every series whose metric matches
* sourceMetric on all commonKeys.
*/
function findMatchingSeriesIndexes(
series: uPlot.Series[],
sourceMetric: Record<string, string>,
commonKeys: string[],
): number[] {
return series.reduce<number[]>((acc, s, i) => {
if (i === 0) {return acc;}
const metric = (s as ExtendedSeries).metric;
if (
metric != null &&
commonKeys.every((key) => metric[key] === sourceMetric[key])
) {
acc.push(i);
}
return acc;
}, []);
}
function applySourceSync({
uPlotInstance,
syncKey,
syncMetadata,
focusedSeriesIndex,
}: {
uPlotInstance: uPlot;
syncKey: string;
syncMetadata: TooltipSyncMetadata | undefined;
focusedSeriesIndex: number | null;
}): void {
syncCursorRegistry.setMetadata(syncKey, syncMetadata);
const focusedSeries =
focusedSeriesIndex != null
? (uPlotInstance.series[focusedSeriesIndex] as ExtendedSeries)
: null;
syncCursorRegistry.setActiveSeriesMetric(syncKey, focusedSeries?.metric ?? null);
}
/**
* Returns:
* null no groupBy filtering configured or cursor off-chart (no-op for tooltip)
* [] groupBy configured but no receiver series match the source (hide synced tooltip)
* number[] 1-based indexes of matching receiver series (show only these)
*/
function applyReceiverSync({
uPlotInstance,
yCrosshairEl,
syncKey,
syncMetadata,
sourceMetadata,
commonKeys,
}: {
uPlotInstance: uPlot;
yCrosshairEl: HTMLElement;
syncKey: string;
syncMetadata: TooltipSyncMetadata | undefined;
sourceMetadata: TooltipSyncMetadata | undefined;
commonKeys: string[];
}): number[] | null {
yCrosshairEl.style.display =
sourceMetadata?.yAxisUnit === syncMetadata?.yAxisUnit ? '' : 'none';
if (commonKeys.length === 0) {
return null;
}
if ((uPlotInstance.cursor.left ?? -1) < 0) {
uPlotInstance.setSeries(null, { focus: false });
return null;
}
const sourceSeriesMetric = syncCursorRegistry.getActiveSeriesMetric(syncKey);
if (sourceSeriesMetric == null) {
uPlotInstance.setSeries(null, { focus: false });
return [];
}
const matchingIdxs = findMatchingSeriesIndexes(
uPlotInstance.series,
sourceSeriesMetric,
commonKeys,
);
if (matchingIdxs.length === 0) {
uPlotInstance.setSeries(null, { focus: false });
return [];
}
uPlotInstance.setSeries(matchingIdxs[0], { focus: true });
return matchingIdxs;
}
export function createSyncDisplayHook(
syncKey: string,
syncMetadata: TooltipSyncMetadata | undefined,
controller: TooltipControllerState,
): (u: uPlot) => void {
// Cached once — avoids a DOM query on every cursor move.
let yCrosshairEl: HTMLElement | null = null;
// groupBy on both panels is stable (set at config time). Recompute the
// intersection only when the source panel's groupBy reference changes.
let lastSourceGroupBy: TooltipSyncMetadata['groupBy'];
let cachedCommonKeys: string[] = [];
return (u: uPlot): void => {
yCrosshairEl ??= u.root.querySelector<HTMLElement>('.u-cursor-y');
if (!yCrosshairEl) {
return;
}
if (u.cursor.event != null) {
controller.syncedSeriesIndexes = null;
applySourceSync({
uPlotInstance: u,
syncKey,
syncMetadata,
focusedSeriesIndex: controller.focusedSeriesIndex,
});
yCrosshairEl.style.display = '';
return;
}
// Read metadata once and pass it down — avoids a second registry lookup
// inside applyReceiverSync.
const sourceMetadata = syncCursorRegistry.getMetadata(syncKey);
if (sourceMetadata?.groupBy !== lastSourceGroupBy) {
lastSourceGroupBy = sourceMetadata?.groupBy;
cachedCommonKeys = getCommonGroupByKeys(
sourceMetadata?.groupBy,
syncMetadata?.groupBy,
);
}
controller.syncedSeriesIndexes = applyReceiverSync({
uPlotInstance: u,
yCrosshairEl,
syncKey,
syncMetadata,
sourceMetadata,
commonKeys: cachedCommonKeys,
});
};
}

View File

@@ -27,6 +27,7 @@ export function createInitialControllerState(): TooltipControllerState {
verticalOffset: 0,
seriesIndexes: [],
focusedSeriesIndex: null,
syncedSeriesIndexes: null,
cursorDrivenBySync: false,
plotWithinViewport: false,
windowWidth: window.innerWidth - WINDOW_OFFSET,
@@ -184,7 +185,7 @@ export function createSetLegendHandler(
return;
}
const newSeriesIndexes = plot.cursor.idxs.slice();
const newSeriesIndexes = [...plot.cursor.idxs];
const isAnySeriesActive = newSeriesIndexes.some((v, i) => i > 0 && v != null);
const previousCursorDrivenBySync = controller.cursorDrivenBySync;

View File

@@ -4,6 +4,7 @@ import type {
ReactNode,
RefObject,
} from 'react';
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
import type uPlot from 'uplot';
import type { TooltipRenderArgs } from '../../components/types';
@@ -39,6 +40,7 @@ export interface TooltipLayoutInfo {
export interface TooltipSyncMetadata {
yAxisUnit?: string;
groupBy?: BaseAutocompleteData[];
}
export interface TooltipPluginProps {
@@ -95,6 +97,11 @@ export interface TooltipControllerState {
verticalOffset: number;
seriesIndexes: Array<number | null>;
focusedSeriesIndex: number | null;
/** Receiver-side series filtering for Tooltip sync mode.
* null = no filtering (source panel or no groupBy configured)
* [] = no matching series found → hide the synced tooltip
* [...] = only these 1-based series indexes should appear in the synced tooltip */
syncedSeriesIndexes: number[] | null;
cursorDrivenBySync: boolean;
plotWithinViewport: boolean;
windowWidth: number;

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.144.3
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/cespare/xxhash/v2 v2.3.0

4
go.sum
View File

@@ -108,8 +108,8 @@ github.com/SigNoz/expr v1.17.7-beta h1:FyZkleM5dTQ0O6muQfwGpoH5A2ohmN/XTasRCO72g
github.com/SigNoz/expr v1.17.7-beta/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
github.com/SigNoz/signoz-otel-collector v0.144.3 h1:/7PPIqIpPsaWtrgnfHam2XVYP41ZlgEKLHzQO8oVxcA=
github.com/SigNoz/signoz-otel-collector v0.144.3/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4 h1:EskJkEMfuuIyArWhV8SleDV/fuKxiaEGTXrCZIFqDT4=
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
github.com/Yiling-J/theine-go v0.6.2/go.mod h1:08QpMa5JZ2pKN+UJCRrCasWYO1IKCdl54Xa836rpmDU=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=

View File

@@ -32,17 +32,27 @@ func NewModule(metadataStore telemetrytypes.MetadataStore, telemetrystore teleme
}
func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetypes.PromotePath, error) {
indexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
logsIndexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
if err != nil {
return nil, err
}
// Flatten the map values (which are slices) into a single slice
indexes := slices.Concat(slices.Collect(maps.Values(logsIndexes))...)
aggr := map[string][]promotetypes.WrappedIndex{}
for _, index := range indexes {
aggr[index.Name] = append(aggr[index.Name], promotetypes.WrappedIndex{
FieldDataType: index.FieldDataType,
Type: index.IndexType,
Granularity: index.Granularity,
path, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
if err != nil {
return nil, err
}
// clean backticks from the path
path = strings.ReplaceAll(path, "`", "")
aggr[path] = append(aggr[path], promotetypes.WrappedIndex{
ColumnType: columnType,
Type: index.Type,
Granularity: index.Granularity,
})
}
promotedPaths, err := m.listPromotedPaths(ctx)

View File

@@ -204,7 +204,7 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet
// Downstream query builder should handle multiple matching keys with their own metadata
// and not rely on this function to do so.
materialized := true
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
indexes := []telemetrytypes.JSONDataTypeIndex{}
fieldContextsSeen := map[telemetrytypes.FieldContext]bool{}
dataTypesSeen := map[telemetrytypes.FieldDataType]bool{}
for _, matchingKey := range matchingKeys {

View File

@@ -195,8 +195,8 @@ func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetryty
// the field genuinely holds the empty/zero value.
//
// Note: indexing is also skipped for Array Nested fields because they cannot be indexed.
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.TelemetryFieldKeySkipIndex) bool {
return telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType] == node.TerminalConfig.ElemType
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
return index.Type == node.TerminalConfig.ElemType
})
isExistsCheck := operator == qbtypes.FilterOperatorExists || operator == qbtypes.FilterOperatorNotExists
if node.TerminalConfig.ElemType.IndexSupported && indexed && !isExistsCheck {

View File

@@ -1127,12 +1127,9 @@ func buildTestTelemetryMetadataStore(t *testing.T, addIndexes bool) *telemetryty
return entry.Path == path && entry.Type == jsonType
})
if idx >= 0 {
key.Indexes = append(key.Indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
Name: path,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: fdt,
BaseColumn: LogsV2BodyV2Column,
IndexExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
key.Indexes = append(key.Indexes, telemetrytypes.JSONDataTypeIndex{
Type: jsonType,
ColumnExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
})
}
}

View File

@@ -106,7 +106,7 @@ func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFiel
return nil
}
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.JSONDataTypeIndex, error) {
filteredPaths := []string{}
for _, path := range paths {
// skip array paths; since they don't have any indexes
@@ -116,22 +116,47 @@ func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...st
filteredPaths = append(filteredPaths, path)
}
if len(filteredPaths) == 0 {
return make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex), nil
return make(map[string][]telemetrytypes.JSONDataTypeIndex), nil
}
// list indexes for the paths
indexes, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
indexesMap, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to list JSON path indexes")
}
// build a set of indexes
fieldPathToIndexes := make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex)
for _, index := range indexes {
fieldPathToIndexes[index.Name] = append(fieldPathToIndexes[index.Name], index)
cleanIndexes := make(map[string][]telemetrytypes.JSONDataTypeIndex)
for path, indexes := range indexesMap {
for _, index := range indexes {
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", index.Expression)
}
jsonDataType, found := telemetrytypes.MappingStringToJSONDataType[columnType]
if !found {
t.logger.ErrorContext(ctx, "failed to map column type to JSON data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
continue
}
if jsonDataType == telemetrytypes.String {
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
Type: telemetrytypes.String,
ColumnExpression: columnExpr,
IndexExpression: index.Expression,
})
} else if strings.HasPrefix(index.Type, "minmax") {
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
Type: jsonDataType,
ColumnExpression: columnExpr,
IndexExpression: index.Expression,
})
}
}
}
return fieldPathToIndexes, nil
return cleanIndexes, nil
}
func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, []any) {
@@ -148,15 +173,14 @@ func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, [
filterExprs := []string{}
for _, filter := range filters {
// Remove backticks from actual expr cuz paths from metadata doesn't have backticks
filterExprs = append(filterExprs, sb.ILike("replaceAll(expr, '`', '')", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(filter))))
filterExprs = append(filterExprs, sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(filter))))
}
sb.Where(sb.Or(filterExprs...))
return sb.BuildWithFlavor(sqlbuilder.ClickHouse)
}
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
ctx = withTelemetryContext(ctx, "ListLogsJSONIndexes")
query, args := buildListLogsJSONIndexesQuery(t.telemetrystore.Cluster(), filters...)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
@@ -165,7 +189,7 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
}
defer rows.Close()
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
indexes := make(map[string][]schemamigrator.Index)
for rows.Next() {
var name string
var typeFull string
@@ -174,39 +198,11 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
if err := rows.Scan(&name, &typeFull, &expr, &granularity); err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to scan string indexed column")
}
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(expr)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", expr)
}
fdt, found := telemetrytypes.MappingJSONDataTypeToFieldDataType[columnType]
if !found {
t.logger.ErrorContext(ctx, "failed to map JSON data type to field data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
continue
}
baseColumn := ""
fieldName := ""
switch {
case strings.HasPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix):
baseColumn = telemetrylogs.BodyV2ColumnPrefix
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix)
case strings.HasPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix):
baseColumn = telemetrylogs.BodyPromotedColumnPrefix
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix)
}
fieldName = strings.ReplaceAll(fieldName, "`", "")
indexes = append(indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
Name: fieldName,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: fdt,
BaseColumn: baseColumn,
IndexName: name,
IndexType: typeFull,
IndexExpression: expr,
Granularity: int(granularity),
indexes[name] = append(indexes[name], schemamigrator.Index{
Name: name,
Type: typeFull,
Expression: expr,
Granularity: int(granularity),
})
}

View File

@@ -36,7 +36,7 @@ func TestBuildListLogsJSONIndexesQuery(t *testing.T) {
cluster: "test-cluster",
filters: []string{"foo", "bar"},
expectedSQL: "SELECT name, type_full, expr, granularity FROM clusterAllReplicas('test-cluster', system.data_skipping_indices) " +
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?)) AND (LOWER(replaceAll(expr, '`', '')) LIKE LOWER(?) OR LOWER(replaceAll(expr, '`', '')) LIKE LOWER(?))",
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?)) AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?))",
expectedArgs: []any{
telemetrylogs.DBName,
telemetrylogs.LogsV2LocalTableName,

View File

@@ -10,10 +10,10 @@ import (
)
type WrappedIndex struct {
JSONDataType telemetrytypes.JSONDataType `json:"-"`
FieldDataType telemetrytypes.FieldDataType `json:"fieldDataType"`
Type string `json:"type"`
Granularity int `json:"granularity"`
JSONDataType telemetrytypes.JSONDataType `json:"-"`
ColumnType string `json:"column_type"`
Type string `json:"type"`
Granularity int `json:"granularity"`
}
type PromotePath struct {
@@ -60,12 +60,12 @@ func (i *PromotePath) ValidateAndSetDefaults() error {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index granularity must be greater than 0")
}
jsonDataType, ok := telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType]
jsonDataType, ok := telemetrytypes.MappingStringToJSONDataType[index.ColumnType]
if !ok {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid column type: %s", index.FieldDataType)
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid column type: %s", index.ColumnType)
}
if !jsonDataType.IndexSupported {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index is not supported for column type: %s", index.FieldDataType)
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index is not supported for column type: %s", index.ColumnType)
}
i.Indexes[idx].JSONDataType = jsonDataType

View File

@@ -37,9 +37,9 @@ type TelemetryFieldKey struct {
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
JSONPlan JSONAccessPlan `json:"-"`
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
JSONPlan JSONAccessPlan `json:"-"`
Indexes []JSONDataTypeIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
Evolutions []*EvolutionEntry `json:"-"`
}
@@ -102,7 +102,7 @@ func (f TelemetryFieldKey) String() string {
if i > 0 {
sb.WriteString("; ")
}
fmt.Fprintf(&sb, "{type=%s, indexExpr=%s}", MappingFieldDataTypeToJSONDataType[index.FieldDataType].StringValue(), index.IndexExpression)
fmt.Fprintf(&sb, "{type=%s, columnExpr=%s, indexExpr=%s}", index.Type.StringValue(), index.ColumnExpression, index.IndexExpression)
}
sb.WriteString("]")
}
@@ -400,14 +400,3 @@ func NewFieldValueSelectorFromPostableFieldValueParams(params PostableFieldValue
return fieldValueSelector
}
type TelemetryFieldKeySkipIndex struct {
Name string `json:"name"` // Name is TelemetryFieldKey.Name not IndexName from ClickHouse
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
BaseColumn string `json:"baseColumn"`
IndexName string `json:"indexName"`
IndexType string `json:"indexType"`
IndexExpression string `json:"indexExpression"`
Granularity int `json:"granularity"`
}

View File

@@ -1,5 +1,11 @@
package telemetrytypes
type JSONDataTypeIndex struct {
Type JSONDataType
ColumnExpression string
IndexExpression string
}
type JSONDataType struct {
str string // Store the correct case for ClickHouse
IsArray bool
@@ -26,17 +32,18 @@ var (
ArrayJSON = JSONDataType{"Array(JSON)", true, "JSON", false}
)
var MappingJSONDataTypeToFieldDataType = map[string]FieldDataType{
"String": FieldDataTypeString,
"Int64": FieldDataTypeInt64,
"Float64": FieldDataTypeFloat64,
"Bool": FieldDataTypeBool,
"Array(Nullable(String))": FieldDataTypeArrayString,
"Array(Nullable(Int64))": FieldDataTypeArrayInt64,
"Array(Nullable(Float64))": FieldDataTypeArrayFloat64,
"Array(Nullable(Bool))": FieldDataTypeArrayBool,
"Array(Dynamic)": FieldDataTypeArrayDynamic,
"Array(JSON)": FieldDataTypeArrayJSON,
var MappingStringToJSONDataType = map[string]JSONDataType{
"String": String,
"Int64": Int64,
"Float64": Float64,
"Bool": Bool,
"Dynamic": Dynamic,
"Array(Nullable(String))": ArrayString,
"Array(Nullable(Int64))": ArrayInt64,
"Array(Nullable(Float64))": ArrayFloat64,
"Array(Nullable(Bool))": ArrayBool,
"Array(Dynamic)": ArrayDynamic,
"Array(JSON)": ArrayJSON,
}
var MappingFieldDataTypeToJSONDataType = map[FieldDataType]JSONDataType{

View File

@@ -3,6 +3,7 @@ package telemetrytypes
import (
"context"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
@@ -34,7 +35,7 @@ type MetadataStore interface {
FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error)
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]TelemetryFieldKeySkipIndex, error)
ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error)
// ListPromotedPaths lists the promoted paths.
GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error)

View File

@@ -4,6 +4,7 @@ import (
"context"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -17,7 +18,7 @@ type MockMetadataStore struct {
TemporalityMap map[string]metrictypes.Temporality
TypeMap map[string]metrictypes.Type
PromotedPathsMap map[string]bool
LogsJSONIndexes []telemetrytypes.TelemetryFieldKeySkipIndex
LogsJSONIndexesMap map[string][]schemamigrator.Index
ColumnEvolutionMetadataMap map[string][]*telemetrytypes.EvolutionEntry
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
// StaticFields holds signal-specific intrinsic field definitions (e.g. telemetrylogs.IntrinsicFields).
@@ -33,7 +34,7 @@ func NewMockMetadataStore() *MockMetadataStore {
TemporalityMap: make(map[string]metrictypes.Temporality),
TypeMap: make(map[string]metrictypes.Type),
PromotedPathsMap: make(map[string]bool),
LogsJSONIndexes: []telemetrytypes.TelemetryFieldKeySkipIndex{},
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
ColumnEvolutionMetadataMap: make(map[string][]*telemetrytypes.EvolutionEntry),
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
StaticFields: make(map[string]telemetrytypes.TelemetryFieldKey),
@@ -368,8 +369,8 @@ func (m *MockMetadataStore) GetPromotedPaths(ctx context.Context, paths ...strin
}
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
return m.LogsJSONIndexes, nil
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
return m.LogsJSONIndexesMap, nil
}
func (m *MockMetadataStore) updateColumnEvolutionMetadataForKeys(_ context.Context, keysToUpdate []*telemetrytypes.TelemetryFieldKey) map[string][]*telemetrytypes.EvolutionEntry {

View File

@@ -23,7 +23,6 @@ pytest_plugins = [
"fixtures.notification_channel",
"fixtures.alerts",
"fixtures.cloudintegrations",
"fixtures.jsontypeexporter",
"fixtures.seeder",
]
@@ -80,6 +79,6 @@ def pytest_addoption(parser: pytest.Parser):
parser.addoption(
"--schema-migrator-version",
action="store",
default="v0.144.3",
default="v0.144.2",
help="schema migrator version",
)

View File

@@ -1,473 +0,0 @@
"""
Simpler version of jsontypeexporter for test fixtures.
This exports JSON type metadata to the path_types table by parsing JSON bodies
and extracting all paths with their types, similar to how the real jsontypeexporter works.
"""
import datetime
import json
from abc import ABC
from http import HTTPStatus
from typing import (
Any,
Callable,
Dict,
Generator,
List,
Optional,
Set,
Union,
)
import numpy as np
import pytest
import requests
from fixtures import types
class JSONPathType(ABC):
"""Represents a JSON path with its type information"""
field_name: str
field_data_type: str
last_seen: np.uint64
signal: str = "logs"
field_context: str = "body"
def __init__(
self,
field_name: str,
field_data_type: str,
last_seen: Optional[datetime.datetime] = None,
) -> None:
self.field_name = field_name
self.field_data_type = field_data_type
self.signal = "logs"
self.field_context = "body"
if last_seen is None:
last_seen = datetime.datetime.now()
self.last_seen = np.uint64(int(last_seen.timestamp() * 1e9))
def np_arr(self) -> np.array:
"""Return path type data as numpy array for database insertion"""
return np.array([self.signal, self.field_context, self.field_name, self.field_data_type, self.last_seen])
# Constants matching jsontypeexporter
ARRAY_SEPARATOR = "[]." # Used in paths like "education[].name"
ARRAY_SUFFIX = "[]" # Used when traversing into array element objects
def _infer_array_type_from_type_strings(types: List[str]) -> Optional[str]:
"""
Infer array type from a list of pre-classified type strings.
Matches metadataexporter's inferArrayMask logic.
Internal type strings are: "JSON", "String", "Bool", "Float64", "Int64"
SuperTyping rules (matching Go inferArrayMask):
- JSON alone → []json
- JSON + any primitive → []dynamic
- String alone → []string; String + other → []dynamic
- Float64 wins over Int64 and Bool
- Int64 wins over Bool
- Bool alone → []bool
"""
if len(types) == 0:
return None
unique = set(types)
has_json = "JSON" in unique
# hasPrimitive mirrors Go: (hasJSON && len(unique) > 1) || (!hasJSON && len(unique) > 0)
has_primitive = (has_json and len(unique) > 1) or (not has_json and len(unique) > 0)
if has_json:
if not has_primitive:
return "[]json"
return "[]dynamic"
# ---- Primitive Type Resolution (Float > Int > Bool) ----
if "String" in unique:
if len(unique) > 1:
return "[]dynamic"
return "[]string"
if "Float64" in unique:
return "[]float64"
if "Int64" in unique:
return "[]int64"
if "Bool" in unique:
return "[]bool"
return "[]dynamic"
def _infer_array_type(elements: List[Any]) -> Optional[str]:
"""
Infer array type from raw Python list elements.
Classifies each element then delegates to _infer_array_type_from_type_strings.
"""
if len(elements) == 0:
return None
types = []
for elem in elements:
if elem is None:
continue
if isinstance(elem, dict):
types.append("JSON")
elif isinstance(elem, str):
types.append("String")
elif isinstance(elem, bool): # must be before int (bool is subclass of int)
types.append("Bool")
elif isinstance(elem, float):
types.append("Float64")
elif isinstance(elem, int):
types.append("Int64")
return _infer_array_type_from_type_strings(types)
def _python_type_to_clickhouse_type(value: Any) -> str:
"""
Convert Python type to ClickHouse JSON type string.
Maps Python types to ClickHouse JSON data types.
Matches metadataexporter's mapPCommonValueTypeToDataType.
"""
if isinstance(value, bool):
return "bool"
elif isinstance(value, int):
return "int64"
elif isinstance(value, float):
return "float64"
elif isinstance(value, str):
return "string"
elif isinstance(value, list):
# Use the sophisticated array type inference
array_type = _infer_array_type(value)
return array_type if array_type else "[]dynamic"
elif isinstance(value, dict):
return "json"
else:
return "string" # Default fallback
def _extract_json_paths(
obj: Any,
current_path: str = "",
path_types: Optional[Dict[str, Set[str]]] = None,
level: int = 0,
) -> Dict[str, Set[str]]:
"""
Recursively extract all paths and their types from a JSON object.
Matches jsontypeexporter's analyzePValue logic.
Args:
obj: The JSON object to traverse
current_path: Current path being built (e.g., "user.name")
path_types: Dictionary mapping paths to sets of types found
level: Current nesting level (for depth limiting)
Returns:
Dictionary mapping paths to sets of type strings
"""
if path_types is None:
path_types = {}
if obj is None:
# Skip null values — matches Go walkNode which errors on ValueTypeEmpty
return path_types
if isinstance(obj, dict):
# For objects, recurse into keys without recording the object itself as a type.
# Matches Go walkMap which recurses without calling ta.record on the map node.
for key, value in obj.items():
# Build the path for this key
if current_path:
new_path = f"{current_path}.{key}"
else:
new_path = key
# Recurse into the value
_extract_json_paths(value, new_path, path_types, level + 1)
elif isinstance(obj, list):
# Skip empty arrays
if len(obj) == 0:
return path_types
# Collect types from array elements (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
types = []
for item in obj:
if isinstance(item, dict):
# When traversing into array element objects, use ArraySuffix ([])
# This matches: prefix+ArraySuffix in the Go code
# Example: if current_path is "education", we use "education[]" to traverse into objects
array_prefix = current_path + ARRAY_SUFFIX if current_path else ""
for key, value in item.items():
if array_prefix:
# Use array separator: education[].name
array_path = f"{array_prefix}.{key}"
else:
array_path = key
# Recurse without increasing level (matching Go behavior)
_extract_json_paths(value, array_path, path_types, level)
types.append("JSON")
elif isinstance(item, list):
# Arrays inside arrays are not supported - skip the whole path
# Matching Go: e.logger.Error("arrays inside arrays are not supported!", ...); return nil
return path_types
elif isinstance(item, str):
types.append("String")
elif isinstance(item, bool):
types.append("Bool")
elif isinstance(item, float):
types.append("Float64")
elif isinstance(item, int):
types.append("Int64")
# Infer array type from collected types (matching Go: if mask := inferArrayMask(types); mask != 0)
if len(types) > 0:
array_type = _infer_array_type_from_type_strings(types)
if array_type and current_path:
if current_path not in path_types:
path_types[current_path] = set()
path_types[current_path].add(array_type)
else:
# Primitive value (string, number, bool)
if current_path:
if current_path not in path_types:
path_types[current_path] = set()
obj_type = _python_type_to_clickhouse_type(obj)
path_types[current_path].add(obj_type)
return path_types
def _parse_json_bodies_and_extract_paths(
json_bodies: List[str],
timestamp: Optional[datetime.datetime] = None,
) -> List[JSONPathType]:
"""
Parse JSON bodies and extract all paths with their types.
This mimics the behavior of jsontypeexporter.
Args:
json_bodies: List of JSON body strings to parse
timestamp: Timestamp to use for last_seen (defaults to now)
Returns:
List of JSONPathType objects with all discovered paths and types
"""
if timestamp is None:
timestamp = datetime.datetime.now()
# Aggregate all paths and their types across all JSON bodies
all_path_types: Dict[str, Set[str]] = {}
for json_body in json_bodies:
try:
parsed = json.loads(json_body)
_extract_json_paths(parsed, "", all_path_types, level=0)
except (json.JSONDecodeError, TypeError):
# Skip invalid JSON
continue
# Convert to list of JSONPathType objects
# Each path can have multiple types, so we create one JSONPathType per type
path_type_objects: List[JSONPathType] = []
for path, types_set in all_path_types.items():
for type_str in types_set:
path_type_objects.append(
JSONPathType(field_name=path, field_data_type=type_str, last_seen=timestamp)
)
return path_type_objects
@pytest.fixture(name="export_json_types", scope="function")
def export_json_types(
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest, # To access migrator fixture
) -> Generator[
Callable[[Union[List[JSONPathType], List[str], List[Any]]], None], Any, None
]:
"""
Fixture for exporting JSON type metadata to the path_types table.
This is a simpler version of jsontypeexporter for test fixtures.
The function can accept:
1. List of JSONPathType objects (manual specification)
2. List of JSON body strings (auto-extract paths)
3. List of Logs objects (extract from body_json field)
Usage examples:
# Manual specification
export_json_types([
JSONPathType(field_name="user.name", field_data_type="string"),
JSONPathType(field_name="user.age", field_data_type="int64"),
])
# Auto-extract from JSON strings
export_json_types([
'{"user": {"name": "alice", "age": 25}}',
'{"user": {"name": "bob", "age": 30}}',
])
# Auto-extract from Logs objects
export_json_types(logs_list)
"""
# Ensure migrator has run to create the table
try:
request.getfixturevalue("migrator")
except Exception:
# If migrator fixture is not available, that's okay - table might already exist
pass
def _export_json_types(
data: Union[
List[JSONPathType], List[str], List[Any]
], # List[Logs] but avoiding circular import
) -> None:
"""
Export JSON type metadata to signoz_metadata.distributed_field_keys table.
This table stores signal, context, path, and type information for body JSON fields.
"""
path_types: List[JSONPathType] = []
if len(data) == 0:
return
# Determine input type and convert to JSONPathType list
first_item = data[0]
if isinstance(first_item, JSONPathType):
# Already JSONPathType objects
path_types = data # type: ignore
elif isinstance(first_item, str):
# List of JSON strings - parse and extract paths
path_types = _parse_json_bodies_and_extract_paths(data) # type: ignore
else:
# Assume it's a list of Logs objects - extract body_v2
json_bodies: List[str] = []
for log in data: # type: ignore
# Try to get body_v2 attribute
if hasattr(log, "body_v2") and log.body_v2:
json_bodies.append(log.body_v2)
elif hasattr(log, "body") and log.body:
# Fallback to body if body_v2 not available
try:
# Try to parse as JSON
json.loads(log.body)
json_bodies.append(log.body)
except (json.JSONDecodeError, TypeError):
pass
if json_bodies:
path_types = _parse_json_bodies_and_extract_paths(json_bodies)
if len(path_types) == 0:
return
clickhouse.conn.insert(
database="signoz_metadata",
table="distributed_field_keys",
data=[path_type.np_arr() for path_type in path_types],
column_names=[
"signal",
"field_context",
"field_name",
"field_data_type",
"last_seen",
],
)
yield _export_json_types
# Cleanup - truncate the local table after tests (following pattern from logs fixture)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_metadata.field_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
@pytest.fixture(name="create_json_index", scope="function")
def create_json_index(
signoz: types.SigNoz,
) -> Generator[Callable[[str, List[Dict[str, Any]]], None], None, None]:
"""
Create ClickHouse data-skipping indexes on body_v2 JSON sub-columns via
POST /api/v1/logs/promote_paths.
**Must be called BEFORE insert_logs** so that newly inserted data parts are
covered by the index and the QB uses the indexed condition path.
Each entry in `paths` follows the PromotePath API shape:
{
"path": "body.user.name", # must start with "body."
"indexes": [
{
"fieldDataType": "string", # string | int64 | float64
"type": "ngrambf_v1(3, 256, 2, 0)", # or "minmax", "tokenbf_v1(...)"
"granularity": 1,
}
],
}
Teardown drops every index created during the test by querying
system.data_skipping_indices for matching expressions.
Example::
def test_foo(signoz, get_token, insert_logs, export_json_types, create_json_body_index):
token = get_token(...)
export_json_types(logs_list)
create_json_body_index(token, [
{"path": "body.user.name",
"indexes": [{"fieldDataType": "string", "type": "ngrambf_v1(3, 256, 2, 0)", "granularity": 1}]},
{"path": "body.user.age",
"indexes": [{"fieldDataType": "int64", "type": "minmax", "granularity": 1}]},
])
insert_logs(logs_list) # data inserted after index exists — index is built automatically
"""
created_paths: List[str] = []
def _create_json_body_index(token: str, paths: List[Dict[str, Any]]) -> None:
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/logs/promote_paths"),
headers={"authorization": f"Bearer {token}"},
json=paths,
timeout=30,
)
assert response.status_code == HTTPStatus.CREATED, (
f"Failed to create JSON body indexes: "
f"{response.status_code} {response.text}"
)
for path in paths:
# The API strips the "body." prefix before storing — mirror that here
# so our cleanup query uses the bare path (e.g. "user.name").
raw = path["path"].removeprefix("body.")
if raw not in created_paths:
created_paths.append(raw)
yield _create_json_body_index
if not created_paths:
return
cluster = signoz.telemetrystore.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER"]
for path in created_paths:
result = signoz.telemetrystore.conn.query(
"SELECT name FROM system.data_skipping_indices "
"WHERE database = 'signoz_logs' AND table = 'logs_v2' "
f"AND expr LIKE '%{path}%'"
)
for (index_name,) in result.result_rows:
signoz.telemetrystore.conn.query(
f"ALTER TABLE signoz_logs.logs_v2 "
f"ON CLUSTER '{cluster}' "
f"DROP INDEX IF EXISTS `{index_name}`"
)

View File

@@ -3,7 +3,7 @@ import json
from abc import ABC
from collections.abc import Callable, Generator
from http import HTTPStatus
from typing import Any, Literal, Optional
from typing import Any, Literal
import numpy as np
import pytest
@@ -121,8 +121,6 @@ class Logs(ABC):
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "default body",
body_v2: Optional[str] = None,
body_promoted: Optional[str] = None,
severity_text: str = "INFO",
trace_id: str = "",
span_id: str = "",
@@ -168,33 +166,6 @@ class Logs(ABC):
# Set body
self.body = body
# Set body_v2 - if body is JSON, parse and stringify it, otherwise use empty string
# ClickHouse accepts String input for JSON column
if body_v2 is not None:
self.body_v2 = body_v2
else:
# Try to parse body as JSON; if successful use it directly,
# otherwise wrap as {"message": body} matching the normalize operator behavior.
try:
json.loads(body)
self.body_v2 = body
except (json.JSONDecodeError, TypeError):
self.body_v2 = json.dumps({"message": body})
# Set body_promoted - must be valid JSON
# Tests will explicitly pass promoted column's content, but we validate it
if body_promoted is not None:
# Validate that it's valid JSON
try:
json.loads(body_promoted)
self.body_promoted = body_promoted
except (json.JSONDecodeError, TypeError):
# If invalid, default to empty JSON object
self.body_promoted = "{}"
else:
# Default to empty JSON object (valid JSON)
self.body_promoted = "{}"
# Process resources and attributes
self.resources_string = {k: str(v) for k, v in resources.items()}
self.resource_json = {} if resource_write_mode == "legacy_only" else dict(self.resources_string)
@@ -338,8 +309,6 @@ class Logs(ABC):
self.severity_text,
self.severity_number,
self.body,
self.body_v2,
self.body_promoted,
self.attributes_string,
self.attributes_number,
self.attributes_bool,
@@ -467,51 +436,31 @@ def insert_logs_to_clickhouse(conn, logs: list[Logs]) -> None:
data=[resource_key.np_arr() for resource_key in resource_keys],
)
all_column_names = [
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"body_v2",
"body_promoted",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
]
result = conn.query(
"SELECT count() FROM system.columns WHERE database = 'signoz_logs' AND table = 'logs_v2' AND name = 'body_v2'"
)
has_json_body = result.result_rows[0][0] > 0
if has_json_body:
column_names = all_column_names
data = [log.np_arr() for log in logs]
else:
json_body_cols = {"body_v2", "body_promoted"}
keep_indices = [
i for i, c in enumerate(all_column_names) if c not in json_body_cols
]
column_names = [all_column_names[i] for i in keep_indices]
data = [log.np_arr()[keep_indices] for log in logs]
conn.insert(
database="signoz_logs",
table="distributed_logs_v2",
data=data,
column_names=column_names,
data=[log.np_arr() for log in logs],
column_names=[
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
],
)

View File

@@ -1,5 +1,3 @@
from typing import Optional
import docker
import pytest
from testcontainers.core.container import Network
@@ -10,32 +8,27 @@ from fixtures.logger import setup_logger
logger = setup_logger(__name__)
def create_migrator(
@pytest.fixture(name="migrator", scope="package")
def migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
cache_key: str = "migrator",
env_overrides: Optional[dict] = None,
) -> types.Operation:
"""
Factory function for running schema migrations.
Accepts optional env_overrides to customize the migrator environment.
Package-scoped fixture for running schema migrations.
"""
def create() -> None:
version = request.config.getoption("--schema-migrator-version")
client = docker.from_env()
environment = dict(env_overrides) if env_overrides else {}
container = client.containers.run(
image=f"signoz/signoz-schema-migrator:{version}",
command=f"sync --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN']}",
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -54,7 +47,6 @@ def create_migrator(
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -67,7 +59,7 @@ def create_migrator(
container.remove()
return types.Operation(name=cache_key)
return types.Operation(name="migrator")
def delete(_: types.Operation) -> None:
pass
@@ -78,27 +70,9 @@ def create_migrator(
return reuse.wrap(
request,
pytestconfig,
cache_key,
"migrator",
lambda: types.Operation(name=""),
create,
delete,
restore,
)
@pytest.fixture(name="migrator", scope="package")
def migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped fixture for running schema migrations.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
)

View File

@@ -1,70 +0,0 @@
import pytest
from testcontainers.core.container import Network
from fixtures import types
from fixtures.migrator import create_migrator
from fixtures.signoz import create_signoz
UNSUPPORTED_CLICKHOUSE_VERSIONS = {"25.5.6"}
def pytest_collection_modifyitems(
config: pytest.Config, items: list[pytest.Item]
) -> None:
version = config.getoption("--clickhouse-version")
if version in UNSUPPORTED_CLICKHOUSE_VERSIONS:
skip = pytest.mark.skip(
reason=f"JSON body QB tests require ClickHouse > {version}"
)
for item in items:
item.add_marker(skip)
@pytest.fixture(name="migrator", scope="package")
def migrator_json(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped migrator with ENABLE_LOGS_MIGRATIONS_V2=1.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="migrator-json-body",
env_overrides={
"ENABLE_LOGS_MIGRATIONS_V2": "1",
},
)
@pytest.fixture(name="signoz", scope="package")
def signoz_json_body(
network: Network,
zeus: types.TestContainerDocker,
gateway: types.TestContainerDocker,
sqlstore: types.TestContainerSQL,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.SigNoz:
"""
Package-scoped fixture for SigNoz with BODY_JSON_QUERY_ENABLED=true.
"""
return create_signoz(
network=network,
zeus=zeus,
gateway=gateway,
sqlstore=sqlstore,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="signoz-json-body",
env_overrides={
"BODY_JSON_QUERY_ENABLED": "true",
},
)