Compare commits

...

5 Commits

Author SHA1 Message Date
Jatinderjit Singh
d0e748ed37 fix: maintenance ignores recurrence when fixed times also set 2026-04-28 16:53:43 +05:30
Nikhil Mantri
ffd493617f feat(infra-monitoring): v2 pods list API (#10833)
* chore: baseline setup

* chore: endpoint detail update

* chore: added logic for hosts v3 api

* fix: bug fix

* chore: disk usage

* chore: added validate function

* chore: added some unit tests

* chore: return status as a string

* chore: yarn generate api

* chore: removed isSendingK8sAgentsMetricsCode

* chore: moved funcs

* chore: added validation on order by

* chore: added pods list logic

* chore: updated openapi yml

* chore: updated spec

* chore: pods api meta start time

* chore: nil pointer check

* chore: nil pointer dereference fix in req.Filter

* chore: added temporalities of metrics

* chore: added pods metrics temporality

* chore: unified composite key function

* chore: code improvements

* chore: added pods list api updates

* chore: hostStatusNone added for clarity that this field can be left empty as well in payload

* chore: yarn generate api

* chore: return errors from getMetadata and lint fix

* chore: return errors from getMetadata and lint fix

* chore: added hostName logic

* chore: modified getMetadata query

* chore: add type for response and files rearrange

* chore: warnings added passing from queryResponse warning to host lists response struct

* chore: added better metrics existence check

* chore: added a TODO remark

* chore: added required metrics check

* chore: distributed samples table to local table change for get metadata

* chore: frontend fix

* chore: endpoint correction

* chore: endpoint modification openapi

* chore: escape backtick to prevent sql injection

* chore: rearrage

* chore: improvements

* chore: validate order by to validate function

* chore: improved description

* chore: added TODOs and made filterByStatus a part of filter struct

* chore: ignore empty string hosts in get active hosts

* feat(infra-monitoring): v2 hosts list - return counts of active & inactive hosts for custom group by attributes (#10956)

* chore: add functionality for showing active and inactive counts in custom group by

* chore: bug fix

* chore: added subquery for active and total count

* chore: ignore empty string hosts in get active hosts

* fix: sinceUnixMilli for determining active hosts compute once per request

* chore: refactor code

* chore: rename HostsList -> ListHosts

* chore: rearrangement

* chore: inframonitoring types renaming

* chore: added types package

* chore: file structure further breakdown for clarity

* chore: comments correction

* chore: removed temporalities

* chore: pods code restructuring

* chore: comments resolve

* chore: added json tag required: true

* chore: removed pod metric temporalities

* chore: removed internal server error

* chore: added status unauthorized

* chore: remove a defensive nil map check, the function ensure non-nil map when err nil

* chore: cleanup and rename

* chore: make sort stable in case of tiebreaker by comparing composite group by keys

* chore: regen api client for inframonitoring

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore: added required tags

* chore: added support for pod phase unknown

* chore: removed pods - order by phase

* chore: improved api description to document -1 as no data in numeric fields

* fix: rebase fixes

* feat(infra-monitoring): v2 pods list apis - phase counts when custom grouping (#11088)

* chore: added phase counts feature

* chore: added queries for pod phase counts in custom group by

* chore: added unknown phase count

* fix: isPodUIDInGroupBy in buildPodRecords

* chore: 3 cte --> 2 cte

* chore: pod phase with local table of time series as counts

* chore: comment correction

* chore: corrected comment

* chore: value column for samples table added

* chore: removed query G for phase counts

* chore: rename variable

* chore: added PodPhaseNum constants to types

* chore: updated comment

* chore: formatted file

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Ashwin Bhatkal <ashwin96@gmail.com>
2026-04-28 10:19:26 +00:00
Pandey
3f95d35ad5 chore: add @therealpandey with @srikanthcvv (#11126)
#### Temporary

- add @therealpandey with @srikanthcvv
2026-04-28 10:01:39 +00:00
Piyush Singariya
9e5678d6b3 CI: JSON QB E2E Integration Suite (#10863)
* feat: enable JSON Path index

* fix: contextual path index usage

* test: fix unit tests

* feat: align negative operators to include other logs

* fix: primitive conditions working

* fix: indexed tests passing

* fix: array type filtering from dynamic arrays

* fix: unit tests

* fix: remove not used paths from testdata

* fix: unit tests

* fix: comment

* fix: indexed unit tests

* fix: array json element comparison

* feat: change filtering of dynamic arrays

* fix: dynamic array tests

* fix: stringified integer value input

* fix: better review for test file

* fix: negative operator check

* ci: lint changes

* chore: import tests from older pr

* fix: better tests

* fix: logs.py

* fix: body tests ready

* fix: better validations

* fix: dynamically change insert stmt for body_v2 availability

* test: with higher migrator version

* fix: type ambiguity

* fix: test

* test: updated validation

* fix: tons of changes

* chore: remove redundent comparison

* ci: tests fixed

* fix: upgraded collector version

* fix: qbtoexpr tests

* fix: go sum

* chore: upgrade collector version v0.144.3-rc.4

* fix: tests

* ci: test fix

* revert: remove db binaries

* test: selectField tests added

* fix: added safeguards in plan generation

* fix: name changed to field_map

* chore: changes based on review

* fix: changes based on review

* fix: remove unused promoted fixture

* test: integration test fix attempt 1

* fix: json access plan remval of AvailableTypes

* fix: invalid index usage on terminal condition

* test: added indexed tests separately

* test: select order by tests added

* fix: update jsontypeexporter

* fix: branches should tell missing array types

* fix: comment removed

* ci: test with updated collector

* fix: issue with FuzzyMatching and API failing

* test: select orderby works

* fix: int64 mapping

* fix: replacing JSONIndex with TelemetryFieldKeyIndex

* chore: update collector to stable release

* chore: update migrator version to stable release

* chore: error fix and unused code

* chore: go mod tidy

* chore: some changes in test to improve quality

* test: enhanced tests to work with backtick required key

* chore: comment removal

* fix: change based on review

* fix: change based on review

* fix: openapi ci

* ci: fix go tests

* fix: index needs body prefix

* fix: tests

* ci: polluted test fix

* fix: shift test files

* refactor(telemetrytypes): replace JSONDataTypeIndex with TelemetryFieldKeySkipIndex

* revert: mcp related changes

* feat: check query log as fixture

* fix: reuse querier fixtures

* chore: rename jsontypeexporter.py

* chore: py-fmt

* fix: minor fix

* fix: py-lint

* fix: changes based on review

* chore: fmt
2026-04-28 08:49:44 +00:00
Abhi kumar
941016f12c Feat/crosshair series highlight (#11013)
* chore: added changes for crosshair sync for tooltip

* chore: minor cleanup

* chore: updated the core structure

* chore: updated the types

* chore: minor cleanup

* feat: added changes for sereis highlighting on crosshair sync

* chore: pr review fixes

* chore: handled other cases of groupby

* chore: minor changes

* feat: tooltip sync across panels (#11114)

* feat: added changes for syncing tooltip

* fix: fixed other tooltips closing when clicked on top of root tooltip

* fix: highlighting first series

* chore: removed y-axis sync for tooltip mode

* chore: minor fix

* chore: fmt fix
2026-04-28 06:52:51 +00:00
55 changed files with 4082 additions and 176 deletions

56
.github/CODEOWNERS vendored
View File

@@ -52,49 +52,49 @@ go.mod @therealpandey
# Querier Owners
/pkg/querier/ @srikanthccv
/pkg/variables/ @srikanthccv
/pkg/types/querybuildertypes/ @srikanthccv
/pkg/types/telemetrytypes/ @srikanthccv
/pkg/querybuilder/ @srikanthccv
/pkg/telemetrylogs/ @srikanthccv
/pkg/telemetrymetadata/ @srikanthccv
/pkg/telemetrymetrics/ @srikanthccv
/pkg/telemetrytraces/ @srikanthccv
/pkg/querier/ @srikanthccv @therealpandey
/pkg/variables/ @srikanthccv @therealpandey
/pkg/types/querybuildertypes/ @srikanthccv @therealpandey
/pkg/types/telemetrytypes/ @srikanthccv @therealpandey
/pkg/querybuilder/ @srikanthccv @therealpandey
/pkg/telemetrylogs/ @srikanthccv @therealpandey
/pkg/telemetrymetadata/ @srikanthccv @therealpandey
/pkg/telemetrymetrics/ @srikanthccv @therealpandey
/pkg/telemetrytraces/ @srikanthccv @therealpandey
# Metrics
/pkg/types/metrictypes/ @srikanthccv
/pkg/types/metricsexplorertypes/ @srikanthccv
/pkg/modules/metricsexplorer/ @srikanthccv
/pkg/prometheus/ @srikanthccv
/pkg/types/metrictypes/ @srikanthccv @therealpandey
/pkg/types/metricsexplorertypes/ @srikanthccv @therealpandey
/pkg/modules/metricsexplorer/ @srikanthccv @therealpandey
/pkg/prometheus/ @srikanthccv @therealpandey
# APM
/pkg/types/servicetypes/ @srikanthccv
/pkg/types/apdextypes/ @srikanthccv
/pkg/modules/apdex/ @srikanthccv
/pkg/modules/services/ @srikanthccv
/pkg/types/servicetypes/ @srikanthccv @therealpandey
/pkg/types/apdextypes/ @srikanthccv @therealpandey
/pkg/modules/apdex/ @srikanthccv @therealpandey
/pkg/modules/services/ @srikanthccv @therealpandey
# Dashboard
/pkg/types/dashboardtypes/ @srikanthccv
/pkg/modules/dashboard/ @srikanthccv
/pkg/types/dashboardtypes/ @srikanthccv @therealpandey
/pkg/modules/dashboard/ @srikanthccv @therealpandey
# Rule/Alertmanager
/pkg/types/ruletypes/ @srikanthccv
/pkg/types/alertmanagertypes @srikanthccv
/pkg/alertmanager/ @srikanthccv
/pkg/ruler/ @srikanthccv
/pkg/modules/rulestatehistory/ @srikanthccv
/pkg/types/rulestatehistorytypes/ @srikanthccv
/pkg/types/ruletypes/ @srikanthccv @therealpandey
/pkg/types/alertmanagertypes @srikanthccv @therealpandey
/pkg/alertmanager/ @srikanthccv @therealpandey
/pkg/ruler/ @srikanthccv @therealpandey
/pkg/modules/rulestatehistory/ @srikanthccv @therealpandey
/pkg/types/rulestatehistorytypes/ @srikanthccv @therealpandey
# Correlation-adjacent
/pkg/contextlinks/ @srikanthccv
/pkg/types/parsertypes/ @srikanthccv
/pkg/queryparser/ @srikanthccv
/pkg/contextlinks/ @srikanthccv @therealpandey
/pkg/types/parsertypes/ @srikanthccv @therealpandey
/pkg/queryparser/ @srikanthccv @therealpandey
# AuthN / AuthZ Owners

View File

@@ -51,6 +51,7 @@ jobs:
- role
- rootuser
- serviceaccount
- querier_json_body
- ttl
sqlstore-provider:
- postgres
@@ -61,7 +62,7 @@ jobs:
- 25.5.6
- 25.12.5
schema-migrator-version:
- v0.142.0
- v0.144.3
postgres-version:
- 15
if: |

View File

@@ -2474,6 +2474,97 @@ components:
- requiredMetricsCheck
- endTimeBeforeRetention
type: object
InframonitoringtypesPodPhase:
enum:
- pending
- running
- succeeded
- failed
- unknown
- ""
type: string
InframonitoringtypesPodRecord:
properties:
failedPodCount:
type: integer
meta:
additionalProperties: {}
nullable: true
type: object
pendingPodCount:
type: integer
podAge:
format: int64
type: integer
podCPU:
format: double
type: number
podCPULimit:
format: double
type: number
podCPURequest:
format: double
type: number
podMemory:
format: double
type: number
podMemoryLimit:
format: double
type: number
podMemoryRequest:
format: double
type: number
podPhase:
$ref: '#/components/schemas/InframonitoringtypesPodPhase'
podUID:
type: string
runningPodCount:
type: integer
succeededPodCount:
type: integer
unknownPodCount:
type: integer
required:
- podUID
- podCPU
- podCPURequest
- podCPULimit
- podMemory
- podMemoryRequest
- podMemoryLimit
- podPhase
- pendingPodCount
- runningPodCount
- succeededPodCount
- failedPodCount
- unknownPodCount
- podAge
- meta
type: object
InframonitoringtypesPods:
properties:
endTimeBeforeRetention:
type: boolean
records:
items:
$ref: '#/components/schemas/InframonitoringtypesPodRecord'
nullable: true
type: array
requiredMetricsCheck:
$ref: '#/components/schemas/InframonitoringtypesRequiredMetricsCheck'
total:
type: integer
type:
$ref: '#/components/schemas/InframonitoringtypesResponseType'
warning:
$ref: '#/components/schemas/Querybuildertypesv5QueryWarnData'
required:
- type
- records
- total
- requiredMetricsCheck
- endTimeBeforeRetention
type: object
InframonitoringtypesPostableHosts:
properties:
end:
@@ -2500,6 +2591,32 @@ components:
- end
- limit
type: object
InframonitoringtypesPostablePods:
properties:
end:
format: int64
type: integer
filter:
$ref: '#/components/schemas/Querybuildertypesv5Filter'
groupBy:
items:
$ref: '#/components/schemas/Querybuildertypesv5GroupByKey'
nullable: true
type: array
limit:
type: integer
offset:
type: integer
orderBy:
$ref: '#/components/schemas/Querybuildertypesv5OrderBy'
start:
format: int64
type: integer
required:
- start
- end
- limit
type: object
InframonitoringtypesRequiredMetricsCheck:
properties:
missingMetrics:
@@ -10886,7 +11003,9 @@ paths:
five metrics, and pagination via offset/limit. The response type is ''list''
for the default host.name grouping or ''grouped_list'' for custom groupBy
keys. Also reports missing required metrics and whether the requested time
range falls before the data retention boundary.'
range falls before the data retention boundary. Numeric metric fields (cpu,
memory, wait, load15, diskUsage) return -1 as a sentinel when no data is available
for that field.'
operationId: ListHosts
requestBody:
content:
@@ -10940,6 +11059,79 @@ paths:
summary: List Hosts for Infra Monitoring
tags:
- inframonitoring
/api/v2/infra_monitoring/pods:
post:
deprecated: false
description: 'Returns a paginated list of Kubernetes pods with key metrics:
CPU usage, CPU request/limit utilization, memory working set, memory request/limit
utilization, current pod phase (pending/running/succeeded/failed/unknown),
and pod age (ms since start time). Each pod includes metadata attributes (namespace,
node, workload owner such as deployment/statefulset/daemonset/job/cronjob,
cluster). Supports filtering via a filter expression, custom groupBy to aggregate
pods by any attribute, ordering by any of the six metrics (cpu, cpu_request,
cpu_limit, memory, memory_request, memory_limit), and pagination via offset/limit.
The response type is ''list'' for the default k8s.pod.uid grouping (each row
is one pod with its current phase) or ''grouped_list'' for custom groupBy
keys (each row aggregates pods in the group with per-phase counts: pendingPodCount,
runningPodCount, succeededPodCount, failedPodCount, unknownPodCount derived
from each pod''s latest phase in the window). Also reports missing required
metrics and whether the requested time range falls before the data retention
boundary. Numeric metric fields (podCPU, podCPURequest, podCPULimit, podMemory,
podMemoryRequest, podMemoryLimit, podAge) return -1 as a sentinel when no
data is available for that field.'
operationId: ListPods
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/InframonitoringtypesPostablePods'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/InframonitoringtypesPods'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: List Pods for Infra Monitoring
tags:
- inframonitoring
/api/v2/livez:
get:
deprecated: false

View File

@@ -13,7 +13,9 @@ import type {
import type {
InframonitoringtypesPostableHostsDTO,
InframonitoringtypesPostablePodsDTO,
ListHosts200,
ListPods200,
RenderErrorResponseDTO,
} from '../sigNoz.schemas';
@@ -21,7 +23,7 @@ import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
import type { ErrorType, BodyType } from '../../../generatedAPIInstance';
/**
* Returns a paginated list of hosts with key infrastructure metrics: CPU usage (%), memory usage (%), I/O wait (%), disk usage (%), and 15-minute load average. Each host includes its current status (active/inactive based on metrics reported in the last 10 minutes) and metadata attributes (e.g., os.type). Supports filtering via a filter expression, filtering by host status, custom groupBy to aggregate hosts by any attribute, ordering by any of the five metrics, and pagination via offset/limit. The response type is 'list' for the default host.name grouping or 'grouped_list' for custom groupBy keys. Also reports missing required metrics and whether the requested time range falls before the data retention boundary.
* Returns a paginated list of hosts with key infrastructure metrics: CPU usage (%), memory usage (%), I/O wait (%), disk usage (%), and 15-minute load average. Each host includes its current status (active/inactive based on metrics reported in the last 10 minutes) and metadata attributes (e.g., os.type). Supports filtering via a filter expression, filtering by host status, custom groupBy to aggregate hosts by any attribute, ordering by any of the five metrics, and pagination via offset/limit. The response type is 'list' for the default host.name grouping or 'grouped_list' for custom groupBy keys. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (cpu, memory, wait, load15, diskUsage) return -1 as a sentinel when no data is available for that field.
* @summary List Hosts for Infra Monitoring
*/
export const listHosts = (
@@ -104,3 +106,87 @@ export const useListHosts = <
return useMutation(mutationOptions);
};
/**
* Returns a paginated list of Kubernetes pods with key metrics: CPU usage, CPU request/limit utilization, memory working set, memory request/limit utilization, current pod phase (pending/running/succeeded/failed/unknown), and pod age (ms since start time). Each pod includes metadata attributes (namespace, node, workload owner such as deployment/statefulset/daemonset/job/cronjob, cluster). Supports filtering via a filter expression, custom groupBy to aggregate pods by any attribute, ordering by any of the six metrics (cpu, cpu_request, cpu_limit, memory, memory_request, memory_limit), and pagination via offset/limit. The response type is 'list' for the default k8s.pod.uid grouping (each row is one pod with its current phase) or 'grouped_list' for custom groupBy keys (each row aggregates pods in the group with per-phase counts: pendingPodCount, runningPodCount, succeededPodCount, failedPodCount, unknownPodCount derived from each pod's latest phase in the window). Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (podCPU, podCPURequest, podCPULimit, podMemory, podMemoryRequest, podMemoryLimit, podAge) return -1 as a sentinel when no data is available for that field.
* @summary List Pods for Infra Monitoring
*/
export const listPods = (
inframonitoringtypesPostablePodsDTO: BodyType<InframonitoringtypesPostablePodsDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<ListPods200>({
url: `/api/v2/infra_monitoring/pods`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: inframonitoringtypesPostablePodsDTO,
signal,
});
};
export const getListPodsMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listPods>>,
TError,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof listPods>>,
TError,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> },
TContext
> => {
const mutationKey = ['listPods'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof listPods>>,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> }
> = (props) => {
const { data } = props ?? {};
return listPods(data);
};
return { mutationFn, ...mutationOptions };
};
export type ListPodsMutationResult = NonNullable<
Awaited<ReturnType<typeof listPods>>
>;
export type ListPodsMutationBody =
BodyType<InframonitoringtypesPostablePodsDTO>;
export type ListPodsMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary List Pods for Infra Monitoring
*/
export const useListPods = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listPods>>,
TError,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof listPods>>,
TError,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> },
TContext
> => {
const mutationOptions = getListPodsMutationOptions(options);
return useMutation(mutationOptions);
};

View File

@@ -3243,6 +3243,108 @@ export interface InframonitoringtypesHostsDTO {
warning?: Querybuildertypesv5QueryWarnDataDTO;
}
export enum InframonitoringtypesPodPhaseDTO {
pending = 'pending',
running = 'running',
succeeded = 'succeeded',
failed = 'failed',
unknown = 'unknown',
'' = '',
}
/**
* @nullable
*/
export type InframonitoringtypesPodRecordDTOMeta = {
[key: string]: unknown;
} | null;
export interface InframonitoringtypesPodRecordDTO {
/**
* @type integer
*/
failedPodCount: number;
/**
* @type object
* @nullable true
*/
meta: InframonitoringtypesPodRecordDTOMeta;
/**
* @type integer
*/
pendingPodCount: number;
/**
* @type integer
* @format int64
*/
podAge: number;
/**
* @type number
* @format double
*/
podCPU: number;
/**
* @type number
* @format double
*/
podCPULimit: number;
/**
* @type number
* @format double
*/
podCPURequest: number;
/**
* @type number
* @format double
*/
podMemory: number;
/**
* @type number
* @format double
*/
podMemoryLimit: number;
/**
* @type number
* @format double
*/
podMemoryRequest: number;
podPhase: InframonitoringtypesPodPhaseDTO;
/**
* @type string
*/
podUID: string;
/**
* @type integer
*/
runningPodCount: number;
/**
* @type integer
*/
succeededPodCount: number;
/**
* @type integer
*/
unknownPodCount: number;
}
export interface InframonitoringtypesPodsDTO {
/**
* @type boolean
*/
endTimeBeforeRetention: boolean;
/**
* @type array
* @nullable true
*/
records: InframonitoringtypesPodRecordDTO[] | null;
requiredMetricsCheck: InframonitoringtypesRequiredMetricsCheckDTO;
/**
* @type integer
*/
total: number;
type: InframonitoringtypesResponseTypeDTO;
warning?: Querybuildertypesv5QueryWarnDataDTO;
}
export interface InframonitoringtypesPostableHostsDTO {
/**
* @type integer
@@ -3271,6 +3373,34 @@ export interface InframonitoringtypesPostableHostsDTO {
start: number;
}
export interface InframonitoringtypesPostablePodsDTO {
/**
* @type integer
* @format int64
*/
end: number;
filter?: Querybuildertypesv5FilterDTO;
/**
* @type array
* @nullable true
*/
groupBy?: Querybuildertypesv5GroupByKeyDTO[] | null;
/**
* @type integer
*/
limit: number;
/**
* @type integer
*/
offset?: number;
orderBy?: Querybuildertypesv5OrderByDTO;
/**
* @type integer
* @format int64
*/
start: number;
}
export interface InframonitoringtypesRequiredMetricsCheckDTO {
/**
* @type array
@@ -7350,6 +7480,14 @@ export type ListHosts200 = {
status: string;
};
export type ListPods200 = {
data: InframonitoringtypesPodsDTO;
/**
* @type string
*/
status: string;
};
export type Livez200 = {
data: FactoryResponseDTO;
/**

View File

@@ -33,6 +33,7 @@ export default function ChartWrapper({
children,
layoutChildren,
yAxisUnit,
groupBy,
customTooltip,
pinnedTooltipElement,
'data-testid': testId,
@@ -68,8 +69,9 @@ export default function ChartWrapper({
const syncMetadata = useMemo(
() => ({
yAxisUnit,
groupBy,
}),
[yAxisUnit],
[yAxisUnit, groupBy],
);
return (

View File

@@ -6,6 +6,7 @@ import {
DashboardCursorSync,
TooltipClickData,
} from 'lib/uPlotV2/plugins/TooltipPlugin/types';
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
interface BaseChartProps {
width: number;
@@ -38,6 +39,7 @@ interface UPlotBasedChartProps {
interface UPlotChartDataProps {
yAxisUnit?: string;
decimalPrecision?: PrecisionOption;
groupBy?: BaseAutocompleteData[];
}
export interface TimeSeriesChartProps

View File

@@ -113,6 +113,10 @@ function BarPanel(props: PanelWrapperProps): JSX.Element {
uPlotRef.current = plot;
}, []);
const groupBy = useMemo(() => {
return widget.query.builder.queryData[0].groupBy;
}, [widget.query]);
return (
<div className="panel-container" ref={graphRef}>
{containerDimensions.width > 0 && containerDimensions.height > 0 && (
@@ -128,6 +132,7 @@ function BarPanel(props: PanelWrapperProps): JSX.Element {
width={containerDimensions.width}
height={containerDimensions.height}
layoutChildren={layoutChildren}
groupBy={groupBy}
isStackedBarChart={widget.stackedBarChart ?? false}
yAxisUnit={widget.yAxisUnit}
decimalPrecision={widget.decimalPrecision}

View File

@@ -105,6 +105,7 @@ export function prepareBarPanelConfig({
colorMapping: widget.customLegendColors ?? {},
isDarkMode,
stepInterval: currentStepInterval,
metric: series.metric,
});
});

View File

@@ -104,6 +104,10 @@ function TimeSeriesPanel(props: PanelWrapperProps): JSX.Element {
widget.decimalPrecision,
]);
const groupBy = useMemo(() => {
return widget.query.builder.queryData[0].groupBy;
}, [widget.query]);
return (
<div className="panel-container" ref={graphRef}>
{containerDimensions.width > 0 && containerDimensions.height > 0 && (
@@ -117,6 +121,7 @@ function TimeSeriesPanel(props: PanelWrapperProps): JSX.Element {
yAxisUnit={widget.yAxisUnit}
decimalPrecision={widget.decimalPrecision}
data={chartData as uPlot.AlignedData}
groupBy={groupBy}
width={containerDimensions.width}
height={containerDimensions.height}
layoutChildren={layoutChildren}

View File

@@ -131,6 +131,7 @@ export const prepareUPlotConfig = ({
pointSize: 5,
fillMode: widget.fillMode || FillMode.None,
isDarkMode,
metric: series.metric,
});
});

View File

@@ -139,8 +139,8 @@ function ChartPreview({
if (startTime && endTime && startTime !== endTime) {
dispatch(
UpdateTimeInterval('custom', [
parseInt(getTimeString(startTime), 10),
parseInt(getTimeString(endTime), 10),
Number.parseInt(getTimeString(startTime), 10),
Number.parseInt(getTimeString(endTime), 10),
]),
);
}

View File

@@ -370,7 +370,7 @@ function FormAlertRules({
// onQueryCategoryChange handles changes to query category
// in state as well as sets additional defaults
const onQueryCategoryChange = (val: EQueryType): void => {
const element = document.getElementById('top');
const element = document.querySelector('#top');
if (element) {
element.scrollIntoView({ behavior: 'smooth' });
}

View File

@@ -279,7 +279,7 @@ function Explorer(): JSX.Element {
[],
);
const [warning, setWarning] = useState<Warning | undefined>(undefined);
const [warning, setWarning] = useState<Warning | undefined>();
const oneChartPerQueryDisabledTooltip = useMemo(() => {
if (splitedQueries.length <= 1) {
@@ -291,7 +291,7 @@ function Explorer(): JSX.Element {
if (disableOneChartPerQuery) {
return 'One chart per query cannot be disabled for multiple queries with different units.';
}
return undefined;
return;
}, [disableOneChartPerQuery, splitedQueries.length, units.length]);
// Show the y axis unit selector if -

View File

@@ -217,7 +217,7 @@ function Inspect({
);
}
if (!inspectMetricsTimeSeries.length) {
if (inspectMetricsTimeSeries.length === 0) {
return renderFallback(
'inspect-metrics-empty',
<Empty description="No time series found for this metric to inspect." />,

View File

@@ -254,10 +254,10 @@ export function useInspectMetrics(
const valuesMap = new Map<number, number>();
series.values.forEach(({ timestamp, value }) => {
valuesMap.set(timestamp, parseFloat(value));
valuesMap.set(timestamp, Number.parseFloat(value));
});
return timestamps.map((timestamp) => valuesMap.get(timestamp) ?? NaN);
return timestamps.map((timestamp) => valuesMap.get(timestamp) ?? Number.NaN);
});
const rawData = [timestamps, ...timeseriesArray];
@@ -271,7 +271,7 @@ export function useInspectMetrics(
labels.add(label);
});
});
return Array.from(labels);
return [...labels];
}, [inspectMetricsData]);
const reset = useCallback(() => {

View File

@@ -115,8 +115,10 @@ export const useGetQueryRange: UseGetQueryRange = (
const updatedQuery = updateBarStepInterval(
requestData.query,
requestData.start ? requestData.start * 1e3 : parseInt(start, 10) * 1e3,
requestData.end ? requestData.end * 1e3 : parseInt(end, 10) * 1e3,
requestData.start
? requestData.start * 1e3
: Number.parseInt(start, 10) * 1e3,
requestData.end ? requestData.end * 1e3 : Number.parseInt(end, 10) * 1e3,
);
return {

View File

@@ -16,6 +16,7 @@ export default function BarChartTooltip(props: BarTooltipProps): JSX.Element {
yAxisUnit: props.yAxisUnit ?? '',
decimalPrecision: props.decimalPrecision,
isStackedBarChart: props.isStackedBarChart,
syncedSeriesIndexes: props.syncedSeriesIndexes,
}),
[
props.uPlotInstance,
@@ -24,6 +25,7 @@ export default function BarChartTooltip(props: BarTooltipProps): JSX.Element {
props.yAxisUnit,
props.decimalPrecision,
props.isStackedBarChart,
props.syncedSeriesIndexes,
],
);

View File

@@ -17,6 +17,7 @@ export default function HistogramTooltip(
uPlotInstance: props.uPlotInstance,
yAxisUnit: props.yAxisUnit ?? '',
decimalPrecision: props.decimalPrecision,
syncedSeriesIndexes: props.syncedSeriesIndexes,
}),
[
props.uPlotInstance,
@@ -24,6 +25,7 @@ export default function HistogramTooltip(
props.dataIndexes,
props.yAxisUnit,
props.decimalPrecision,
props.syncedSeriesIndexes,
],
);

View File

@@ -17,6 +17,7 @@ export default function TimeSeriesTooltip(
uPlotInstance: props.uPlotInstance,
yAxisUnit: props.yAxisUnit ?? '',
decimalPrecision: props.decimalPrecision,
syncedSeriesIndexes: props.syncedSeriesIndexes,
}),
[
props.uPlotInstance,
@@ -24,6 +25,7 @@ export default function TimeSeriesTooltip(
props.dataIndexes,
props.yAxisUnit,
props.decimalPrecision,
props.syncedSeriesIndexes,
],
);

View File

@@ -62,6 +62,7 @@ export function buildTooltipContent({
yAxisUnit,
decimalPrecision,
isStackedBarChart,
syncedSeriesIndexes,
}: {
data: AlignedData;
series: Series[];
@@ -71,18 +72,34 @@ export function buildTooltipContent({
yAxisUnit: string;
decimalPrecision?: PrecisionOption;
isStackedBarChart?: boolean;
syncedSeriesIndexes?: number[] | null;
}): TooltipContentItem[] {
const items: TooltipContentItem[] = [];
const allowedIndexes =
syncedSeriesIndexes != null ? new Set(syncedSeriesIndexes) : null;
for (let seriesIndex = 1; seriesIndex < series.length; seriesIndex += 1) {
const seriesItem = series[seriesIndex];
if (!seriesItem?.show) {
continue;
}
if (allowedIndexes != null && !allowedIndexes.has(seriesIndex)) {
continue;
}
const dataIndex = dataIndexes[seriesIndex];
// Skip series with no data at the current cursor position
const isSync = allowedIndexes != null;
if (dataIndex === null) {
if (isSync) {
items.push({
label: String(seriesItem.label ?? ''),
value: 0,
tooltipValue: 'No Data',
color: resolveSeriesColor(seriesItem.stroke, uPlotInstance, seriesIndex),
isActive: false,
});
}
continue;
}
@@ -102,6 +119,14 @@ export function buildTooltipContent({
color: resolveSeriesColor(seriesItem.stroke, uPlotInstance, seriesIndex),
isActive: seriesIndex === activeSeriesIndex,
});
} else if (isSync) {
items.push({
label: String(seriesItem.label ?? ''),
value: 0,
tooltipValue: 'No Data',
color: resolveSeriesColor(seriesItem.stroke, uPlotInstance, seriesIndex),
isActive: false,
});
}
}

View File

@@ -58,6 +58,9 @@ export interface TooltipRenderArgs {
isPinned: boolean;
dismiss: () => void;
viaSync: boolean;
/** In Tooltip sync mode, limits which series are rendered in the receiver tooltip.
* null = no filtering; [] = no matches (tooltip hidden upstream); [...] = allowed indexes */
syncedSeriesIndexes?: number[] | null;
}
export interface BaseTooltipProps {

View File

@@ -9,6 +9,7 @@ import {
BarAlignment,
ConfigBuilder,
DrawStyle,
ExtendedSeries,
FillMode,
LineInterpolation,
LineStyle,
@@ -27,7 +28,10 @@ let builders: PathBuilders | null = null;
const DEFAULT_LINE_WIDTH = 2;
export const POINT_SIZE_FACTOR = 2.5;
export class UPlotSeriesBuilder extends ConfigBuilder<SeriesProps, Series> {
export class UPlotSeriesBuilder extends ConfigBuilder<
SeriesProps,
ExtendedSeries
> {
constructor(props: SeriesProps) {
super(props);
const pathBuilders = uPlot.paths;
@@ -205,8 +209,8 @@ export class UPlotSeriesBuilder extends ConfigBuilder<SeriesProps, Series> {
);
}
getConfig(): Series {
const { scaleKey, label, spanGaps, show = true } = this.props;
getConfig(): ExtendedSeries {
const { scaleKey, label, spanGaps, show = true, metric } = this.props;
const resolvedLineColor = this.getLineColor();
@@ -233,6 +237,7 @@ export class UPlotSeriesBuilder extends ConfigBuilder<SeriesProps, Series> {
...lineConfig,
...pathConfig,
points: Object.keys(pointsConfig).length > 0 ? pointsConfig : undefined,
metric,
};
}
}

View File

@@ -171,6 +171,10 @@ export enum FillMode {
None = 'none',
}
export type ExtendedSeries = Series & {
metric?: { [key: string]: string };
};
export interface SeriesProps extends LineConfig, PointsConfig, BarConfig {
scaleKey: string;
label?: string;
@@ -194,6 +198,7 @@ export interface SeriesProps extends LineConfig, PointsConfig, BarConfig {
fillMode?: FillMode;
isDarkMode?: boolean;
stepInterval?: number;
metric?: { [key: string]: string };
}
export interface LegendItem {

View File

@@ -4,7 +4,7 @@ import cx from 'classnames';
import uPlot from 'uplot';
import logEvent from 'api/common/logEvent';
import { syncCursorRegistry } from './syncCursorRegistry';
import { createSyncDisplayHook } from './syncDisplayHook';
import {
createInitialControllerState,
createSetCursorHandler,
@@ -107,32 +107,20 @@ export default function TooltipPlugin({
// Enable uPlot's built-in cursor sync when requested so that
// crosshair / tooltip can follow the dashboard-wide cursor.
let removeSyncDisplayHook: (() => void) | null = null;
if (syncMode !== DashboardCursorSync.None && config.scales[0]?.props.time) {
config.setCursor({
sync: { key: syncKey, scales: ['x', 'y'] },
sync: {
key: syncKey,
scales:
syncMode === DashboardCursorSync.Crosshair ? ['x', 'y'] : ['x', null],
},
});
// Show the horizontal crosshair only when the receiving panel shares
// the same y-axis unit as the source panel. When this panel is the
// source (cursor.event != null) the line is always shown and this
// panel's metadata is written to the registry so receivers can read it.
config.addHook('setCursor', (u: uPlot): void => {
const yCursorEl = u.root.querySelector<HTMLElement>('.u-cursor-y');
if (!yCursorEl) {
return;
}
if (u.cursor.event != null) {
// This panel is the source — publish metadata and always show line.
syncCursorRegistry.setMetadata(syncKey, syncMetadata);
yCursorEl.style.display = '';
} else {
// This panel is receiving sync — show only if units match.
const sourceMeta = syncCursorRegistry.getMetadata(syncKey);
yCursorEl.style.display =
sourceMeta?.yAxisUnit === syncMetadata?.yAxisUnit ? '' : 'none';
}
});
removeSyncDisplayHook = config.addHook(
'setCursor',
createSyncDisplayHook(syncKey, syncMetadata, controller),
);
}
// Dismiss the tooltip when the user clicks / presses a key
@@ -140,7 +128,12 @@ export default function TooltipPlugin({
const onOutsideInteraction = (event: Event): void => {
const target = event.target as Node;
if (!containerRef.current?.contains(target)) {
dismissTooltip();
// Don't dismiss if the click landed inside any other pinned tooltip.
const isInsideAnyPinnedTooltip =
(target as Element).closest?.('[data-pinned="true"]') != null;
if (!isInsideAnyPinnedTooltip) {
dismissTooltip();
}
}
};
@@ -206,6 +199,16 @@ export default function TooltipPlugin({
if (!controller.hoverActive || !plot) {
return null;
}
// In Tooltip sync mode, suppress the receiver tooltip entirely when
// no receiver series match the source panel's focused series.
if (
syncTooltipWithDashboard &&
controller.cursorDrivenBySync &&
Array.isArray(controller.syncedSeriesIndexes) &&
controller.syncedSeriesIndexes.length === 0
) {
return null;
}
return renderRef.current({
uPlotInstance: plot,
dataIndexes: controller.seriesIndexes,
@@ -213,6 +216,7 @@ export default function TooltipPlugin({
isPinned: controller.pinned,
dismiss: dismissTooltip,
viaSync: controller.cursorDrivenBySync,
syncedSeriesIndexes: controller.syncedSeriesIndexes,
});
}
@@ -443,6 +447,7 @@ export default function TooltipPlugin({
removeSetSeriesHook();
removeSetLegendHook();
removeSetCursorHook();
removeSyncDisplayHook?.();
if (overClickHandler) {
const plot = getPlot(controller);
plot?.over.removeEventListener('click', overClickHandler);
@@ -505,7 +510,7 @@ export default function TooltipPlugin({
isHovering,
contents,
]);
const isTooltipVisible = isHovering || tooltipBody != null;
const isTooltipVisible = tooltipBody != null;
if (!hasPlot) {
return null;

View File

@@ -9,9 +9,13 @@ import type { TooltipSyncMetadata } from './types';
*
* Receivers use this to make decisions such as:
* - Whether to show the horizontal crosshair line (matching yAxisUnit)
* - Future: what to render inside the tooltip (matching groupBy, etc.)
* - Which series to highlight when panels share the same groupBy
*/
const metadataBySyncKey = new Map<string, TooltipSyncMetadata | undefined>();
const activeSeriesMetricBySyncKey = new Map<
string,
Record<string, string> | null
>();
export const syncCursorRegistry = {
setMetadata(syncKey: string, metadata: TooltipSyncMetadata | undefined): void {
@@ -21,4 +25,15 @@ export const syncCursorRegistry = {
getMetadata(syncKey: string): TooltipSyncMetadata | undefined {
return metadataBySyncKey.get(syncKey);
},
setActiveSeriesMetric(
syncKey: string,
metric: Record<string, string> | null,
): void {
activeSeriesMetricBySyncKey.set(syncKey, metric);
},
getActiveSeriesMetric(syncKey: string): Record<string, string> | null {
return activeSeriesMetricBySyncKey.get(syncKey) ?? null;
},
};

View File

@@ -0,0 +1,185 @@
import uPlot from 'uplot';
import type { ExtendedSeries } from '../../config/types';
import { syncCursorRegistry } from './syncCursorRegistry';
import type { TooltipControllerState, TooltipSyncMetadata } from './types';
/**
* Returns the dimension keys present in both groupBy arrays.
* An empty result means no overlap — series highlighting should not run.
*
* exact [A, B] vs [A, B] → [A, B] one match
* subset [A] vs [A, B] → [A] multiple receiver series may match
* superset [A, B] vs [A] → [A] one receiver series matches
* partial [A, B] vs [B, C] → [B]
*/
function getCommonGroupByKeys(
a: TooltipSyncMetadata['groupBy'],
b: TooltipSyncMetadata['groupBy'],
): string[] {
if (
!Array.isArray(a) ||
a.length === 0 ||
!Array.isArray(b) ||
b.length === 0
) {
return [];
}
const bKeys = new Set(b.map((g) => g.key));
return a.filter((g) => bKeys.has(g.key)).map((g) => g.key);
}
/**
* Returns the 1-based indexes of every series whose metric matches
* sourceMetric on all commonKeys.
*/
function findMatchingSeriesIndexes(
series: uPlot.Series[],
sourceMetric: Record<string, string>,
commonKeys: string[],
): number[] {
return series.reduce<number[]>((acc, s, i) => {
if (i === 0) {
return acc;
}
const metric = (s as ExtendedSeries).metric;
if (
metric != null &&
commonKeys.every((key) => metric[key] === sourceMetric[key])
) {
acc.push(i);
}
return acc;
}, []);
}
function applySourceSync({
uPlotInstance,
syncKey,
syncMetadata,
focusedSeriesIndex,
}: {
uPlotInstance: uPlot;
syncKey: string;
syncMetadata: TooltipSyncMetadata | undefined;
focusedSeriesIndex: number | null;
}): void {
syncCursorRegistry.setMetadata(syncKey, syncMetadata);
const focusedSeries =
focusedSeriesIndex != null
? (uPlotInstance.series[focusedSeriesIndex] as ExtendedSeries)
: null;
syncCursorRegistry.setActiveSeriesMetric(
syncKey,
focusedSeries?.metric ?? null,
);
}
/**
* Returns:
* null no groupBy filtering configured or cursor off-chart (no-op for tooltip)
* [] groupBy configured but no receiver series match the source (hide synced tooltip)
* number[] 1-based indexes of matching receiver series (show only these)
*/
function applyReceiverSync({
uPlotInstance,
yCrosshairEl,
syncKey,
syncMetadata,
sourceMetadata,
commonKeys,
}: {
uPlotInstance: uPlot;
yCrosshairEl: HTMLElement;
syncKey: string;
syncMetadata: TooltipSyncMetadata | undefined;
sourceMetadata: TooltipSyncMetadata | undefined;
commonKeys: string[];
}): number[] | null {
yCrosshairEl.style.display =
sourceMetadata?.yAxisUnit === syncMetadata?.yAxisUnit ? '' : 'none';
if (commonKeys.length === 0) {
return null;
}
if ((uPlotInstance.cursor.left ?? -1) < 0) {
uPlotInstance.setSeries(null, { focus: false });
return null;
}
const sourceSeriesMetric = syncCursorRegistry.getActiveSeriesMetric(syncKey);
if (sourceSeriesMetric == null) {
uPlotInstance.setSeries(null, { focus: false });
return [];
}
const matchingIdxs = findMatchingSeriesIndexes(
uPlotInstance.series,
sourceSeriesMetric,
commonKeys,
);
if (matchingIdxs.length === 0) {
uPlotInstance.setSeries(null, { focus: false });
return [];
}
uPlotInstance.setSeries(matchingIdxs[0], { focus: true });
return matchingIdxs;
}
export function createSyncDisplayHook(
syncKey: string,
syncMetadata: TooltipSyncMetadata | undefined,
controller: TooltipControllerState,
): (u: uPlot) => void {
// Cached once — avoids a DOM query on every cursor move.
let yCrosshairEl: HTMLElement | null = null;
// groupBy on both panels is stable (set at config time). Recompute the
// intersection only when the source panel's groupBy reference changes.
let lastSourceGroupBy: TooltipSyncMetadata['groupBy'];
let cachedCommonKeys: string[] = [];
return (u: uPlot): void => {
yCrosshairEl ??= u.root.querySelector<HTMLElement>('.u-cursor-y');
if (!yCrosshairEl) {
return;
}
if (u.cursor.event != null) {
controller.syncedSeriesIndexes = null;
applySourceSync({
uPlotInstance: u,
syncKey,
syncMetadata,
focusedSeriesIndex: controller.focusedSeriesIndex,
});
yCrosshairEl.style.display = '';
return;
}
// Read metadata once and pass it down — avoids a second registry lookup
// inside applyReceiverSync.
const sourceMetadata = syncCursorRegistry.getMetadata(syncKey);
if (sourceMetadata?.groupBy !== lastSourceGroupBy) {
lastSourceGroupBy = sourceMetadata?.groupBy;
cachedCommonKeys = getCommonGroupByKeys(
sourceMetadata?.groupBy,
syncMetadata?.groupBy,
);
}
controller.syncedSeriesIndexes = applyReceiverSync({
uPlotInstance: u,
yCrosshairEl,
syncKey,
syncMetadata,
sourceMetadata,
commonKeys: cachedCommonKeys,
});
};
}

View File

@@ -27,6 +27,7 @@ export function createInitialControllerState(): TooltipControllerState {
verticalOffset: 0,
seriesIndexes: [],
focusedSeriesIndex: null,
syncedSeriesIndexes: null,
cursorDrivenBySync: false,
plotWithinViewport: false,
windowWidth: window.innerWidth - WINDOW_OFFSET,
@@ -184,7 +185,7 @@ export function createSetLegendHandler(
return;
}
const newSeriesIndexes = plot.cursor.idxs.slice();
const newSeriesIndexes = [...plot.cursor.idxs];
const isAnySeriesActive = newSeriesIndexes.some((v, i) => i > 0 && v != null);
const previousCursorDrivenBySync = controller.cursorDrivenBySync;

View File

@@ -4,6 +4,7 @@ import type {
ReactNode,
RefObject,
} from 'react';
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
import type uPlot from 'uplot';
import type { TooltipRenderArgs } from '../../components/types';
@@ -39,6 +40,7 @@ export interface TooltipLayoutInfo {
export interface TooltipSyncMetadata {
yAxisUnit?: string;
groupBy?: BaseAutocompleteData[];
}
export interface TooltipPluginProps {
@@ -95,6 +97,11 @@ export interface TooltipControllerState {
verticalOffset: number;
seriesIndexes: Array<number | null>;
focusedSeriesIndex: number | null;
/** Receiver-side series filtering for Tooltip sync mode.
* null = no filtering (source panel or no groupBy configured)
* [] = no matching series found → hide the synced tooltip
* [...] = only these 1-based series indexes should appear in the synced tooltip */
syncedSeriesIndexes: number[] | null;
cursorDrivenBySync: boolean;
plotWithinViewport: boolean;
windowWidth: number;

View File

@@ -210,7 +210,7 @@ describe('TooltipPlugin', () => {
expect(container.parentElement).toBe(document.body);
const fullscreenRoot = document.createElement('div');
document.body.appendChild(fullscreenRoot);
document.body.append(fullscreenRoot);
act(() => {
mockedFullscreenElement = fullscreenRoot;
@@ -252,7 +252,7 @@ describe('TooltipPlugin', () => {
renderAndActivateHover(config, undefined, { canPinTooltip: true });
const container = screen.getByTestId('tooltip-plugin-container');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
act(() => {
document.body.dispatchEvent(
@@ -266,7 +266,7 @@ describe('TooltipPlugin', () => {
return waitFor(() => {
const updated = screen.getByTestId('tooltip-plugin-container');
expect(updated).toBeInTheDocument();
expect(updated.getAttribute('data-pinned') === 'true').toBe(true);
expect(updated.dataset.pinned === 'true').toBe(true);
});
});
@@ -340,7 +340,7 @@ describe('TooltipPlugin', () => {
// Wait until the tooltip is actually pinned.
await waitFor(() => {
const container = screen.getByTestId('tooltip-plugin-container');
expect(container.getAttribute('data-pinned') === 'true').toBe(true);
expect(container.dataset.pinned === 'true').toBe(true);
});
const button = await screen.findByRole('button', { name: 'Dismiss' });
@@ -353,7 +353,7 @@ describe('TooltipPlugin', () => {
expect(container).toBeInTheDocument();
expect(container.getAttribute('aria-hidden')).toBe('true');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
expect(container.textContent).toBe('');
});
});
@@ -391,9 +391,7 @@ describe('TooltipPlugin', () => {
});
expect(
screen
.getByTestId('tooltip-plugin-container')
.getAttribute('data-pinned') === 'true',
screen.getByTestId('tooltip-plugin-container').dataset.pinned === 'true',
).toBe(true);
// Simulate data update should dismiss the pinned tooltip.
@@ -405,7 +403,7 @@ describe('TooltipPlugin', () => {
const container = screen.getByTestId('tooltip-plugin-container');
expect(container).toBeInTheDocument();
expect(container.getAttribute('aria-hidden')).toBe('true');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
jest.useRealTimers();
});
@@ -443,9 +441,7 @@ describe('TooltipPlugin', () => {
});
expect(
screen
.getByTestId('tooltip-plugin-container')
.getAttribute('data-pinned') === 'true',
screen.getByTestId('tooltip-plugin-container').dataset.pinned === 'true',
).toBe(true);
// Click outside the tooltip container.
@@ -459,7 +455,7 @@ describe('TooltipPlugin', () => {
expect(container).toBeInTheDocument();
expect(container.getAttribute('aria-hidden')).toBe('true');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
});
jest.useRealTimers();
@@ -498,9 +494,7 @@ describe('TooltipPlugin', () => {
});
expect(
screen
.getByTestId('tooltip-plugin-container')
.getAttribute('data-pinned') === 'true',
screen.getByTestId('tooltip-plugin-container').dataset.pinned === 'true',
).toBe(true);
// Press Escape to release.
@@ -515,7 +509,7 @@ describe('TooltipPlugin', () => {
const container = screen.getByTestId('tooltip-plugin-container');
expect(container).toBeInTheDocument();
expect(container.getAttribute('aria-hidden')).toBe('true');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
});
jest.useRealTimers();
@@ -541,9 +535,7 @@ describe('TooltipPlugin', () => {
await waitFor(() => {
expect(
screen
.getByTestId('tooltip-plugin-container')
.getAttribute('data-pinned') === 'true',
screen.getByTestId('tooltip-plugin-container').dataset.pinned === 'true',
).toBe(true);
});
@@ -560,7 +552,7 @@ describe('TooltipPlugin', () => {
await waitFor(() => {
const container = screen.getByTestId('tooltip-plugin-container');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
});
jest.useRealTimers();
@@ -580,7 +572,7 @@ describe('TooltipPlugin', () => {
const container = screen.getByTestId('tooltip-plugin-container');
// Tooltip should still be hovering (visible), not dismissed.
expect(container.getAttribute('aria-hidden')).toBe('false');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
});
it('does not unpin on arbitrary keys that are not Escape or the pin key', async () => {
@@ -603,9 +595,7 @@ describe('TooltipPlugin', () => {
await waitFor(() => {
expect(
screen
.getByTestId('tooltip-plugin-container')
.getAttribute('data-pinned') === 'true',
screen.getByTestId('tooltip-plugin-container').dataset.pinned === 'true',
).toBe(true);
});
@@ -619,9 +609,7 @@ describe('TooltipPlugin', () => {
await waitFor(() => {
expect(
screen
.getByTestId('tooltip-plugin-container')
.getAttribute('data-pinned') === 'true',
screen.getByTestId('tooltip-plugin-container').dataset.pinned === 'true',
).toBe(true);
});
@@ -665,7 +653,7 @@ describe('TooltipPlugin', () => {
});
const container = screen.getByTestId('tooltip-plugin-container');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
});
it('does not pin when hover is not active', () => {
@@ -699,7 +687,7 @@ describe('TooltipPlugin', () => {
// The container exists once the plot is initialised, but it should
// be hidden and not pinned since hover was never activated.
const container = screen.getByTestId('tooltip-plugin-container');
expect(container.getAttribute('data-pinned') === 'true').toBe(false);
expect(container.dataset.pinned === 'true').toBe(false);
expect(container.getAttribute('aria-hidden')).toBe('true');
});
@@ -723,9 +711,7 @@ describe('TooltipPlugin', () => {
await waitFor(() => {
expect(
screen
.getByTestId('tooltip-plugin-container')
.getAttribute('data-pinned') === 'true',
screen.getByTestId('tooltip-plugin-container').dataset.pinned === 'true',
).toBe(false);
});
@@ -738,9 +724,7 @@ describe('TooltipPlugin', () => {
await waitFor(() => {
expect(
screen
.getByTestId('tooltip-plugin-container')
.getAttribute('data-pinned') === 'true',
screen.getByTestId('tooltip-plugin-container').dataset.pinned === 'true',
).toBe(true);
});
});
@@ -796,7 +780,7 @@ describe('TooltipPlugin', () => {
// ---- Cursor sync ------------------------------------------------------------
describe('cursor sync', () => {
it('enables uPlot cursor sync for time-based scales when mode is Tooltip', () => {
it('enables uPlot cursor sync on x-axis only when mode is Tooltip', () => {
const config = createConfigMock();
const setCursorSpy = jest.spyOn(config, 'setCursor');
config.addScale({ scaleKey: 'x', time: true });
@@ -810,6 +794,25 @@ describe('TooltipPlugin', () => {
}),
);
expect(setCursorSpy).toHaveBeenCalledWith({
sync: { key: 'dashboard-sync', scales: ['x', null] },
});
});
it('enables uPlot cursor sync on both axes when mode is Crosshair', () => {
const config = createConfigMock();
const setCursorSpy = jest.spyOn(config, 'setCursor');
config.addScale({ scaleKey: 'x', time: true });
render(
React.createElement(TooltipPlugin, {
config,
render: () => null,
syncMode: DashboardCursorSync.Crosshair,
syncKey: 'dashboard-sync',
}),
);
expect(setCursorSpy).toHaveBeenCalledWith({
sync: { key: 'dashboard-sync', scales: ['x', 'y'] },
});

View File

@@ -98,7 +98,7 @@ function LogsExplorer(): JSX.Element {
setIsLoadingQueries(false);
}, [queryClient]);
const [warning, setWarning] = useState<Warning | undefined>(undefined);
const [warning, setWarning] = useState<Warning | undefined>();
const handleChangeSelectedView = useCallback(
(view: ExplorerViews, querySearchParameters?: ICurrentQueryData): void => {

View File

@@ -101,7 +101,7 @@ function TracesExplorer(): JSX.Element {
getExplorerViewFromUrl(searchParams, panelTypesFromUrl),
);
const [warning, setWarning] = useState<Warning | undefined>(undefined);
const [warning, setWarning] = useState<Warning | undefined>();
const [isOpen, setOpen] = useState<boolean>(true);
const defaultQuery = useMemo(

View File

@@ -16,7 +16,7 @@ func (provider *provider) addInfraMonitoringRoutes(router *mux.Router) error {
ID: "ListHosts",
Tags: []string{"inframonitoring"},
Summary: "List Hosts for Infra Monitoring",
Description: "Returns a paginated list of hosts with key infrastructure metrics: CPU usage (%), memory usage (%), I/O wait (%), disk usage (%), and 15-minute load average. Each host includes its current status (active/inactive based on metrics reported in the last 10 minutes) and metadata attributes (e.g., os.type). Supports filtering via a filter expression, filtering by host status, custom groupBy to aggregate hosts by any attribute, ordering by any of the five metrics, and pagination via offset/limit. The response type is 'list' for the default host.name grouping or 'grouped_list' for custom groupBy keys. Also reports missing required metrics and whether the requested time range falls before the data retention boundary.",
Description: "Returns a paginated list of hosts with key infrastructure metrics: CPU usage (%), memory usage (%), I/O wait (%), disk usage (%), and 15-minute load average. Each host includes its current status (active/inactive based on metrics reported in the last 10 minutes) and metadata attributes (e.g., os.type). Supports filtering via a filter expression, filtering by host status, custom groupBy to aggregate hosts by any attribute, ordering by any of the five metrics, and pagination via offset/limit. The response type is 'list' for the default host.name grouping or 'grouped_list' for custom groupBy keys. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (cpu, memory, wait, load15, diskUsage) return -1 as a sentinel when no data is available for that field.",
Request: new(inframonitoringtypes.PostableHosts),
RequestContentType: "application/json",
Response: new(inframonitoringtypes.Hosts),
@@ -29,5 +29,24 @@ func (provider *provider) addInfraMonitoringRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v2/infra_monitoring/pods", handler.New(
provider.authZ.ViewAccess(provider.infraMonitoringHandler.ListPods),
handler.OpenAPIDef{
ID: "ListPods",
Tags: []string{"inframonitoring"},
Summary: "List Pods for Infra Monitoring",
Description: "Returns a paginated list of Kubernetes pods with key metrics: CPU usage, CPU request/limit utilization, memory working set, memory request/limit utilization, current pod phase (pending/running/succeeded/failed/unknown), and pod age (ms since start time). Each pod includes metadata attributes (namespace, node, workload owner such as deployment/statefulset/daemonset/job/cronjob, cluster). Supports filtering via a filter expression, custom groupBy to aggregate pods by any attribute, ordering by any of the six metrics (cpu, cpu_request, cpu_limit, memory, memory_request, memory_limit), and pagination via offset/limit. The response type is 'list' for the default k8s.pod.uid grouping (each row is one pod with its current phase) or 'grouped_list' for custom groupBy keys (each row aggregates pods in the group with per-phase counts: pendingPodCount, runningPodCount, succeededPodCount, failedPodCount, unknownPodCount derived from each pod's latest phase in the window). Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (podCPU, podCPURequest, podCPULimit, podMemory, podMemoryRequest, podMemoryLimit, podAge) return -1 as a sentinel when no data is available for that field.",
Request: new(inframonitoringtypes.PostablePods),
RequestContentType: "application/json",
Response: new(inframonitoringtypes.Pods),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -45,3 +45,27 @@ func (h *handler) ListHosts(rw http.ResponseWriter, req *http.Request) {
render.Success(rw, http.StatusOK, result)
}
func (h *handler) ListPods(rw http.ResponseWriter, req *http.Request) {
claims, err := authtypes.ClaimsFromContext(req.Context())
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
var parsedReq inframonitoringtypes.PostablePods
if err := binding.JSON.BindBody(req.Body, &parsedReq); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.ListPods(req.Context(), orgID, &parsedReq)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}

View File

@@ -14,3 +14,12 @@ type groupHostStatusCounts struct {
Active int
Inactive int
}
// podPhaseCounts holds per-group pod counts bucketed by latest phase in window.
type podPhaseCounts struct {
Pending int
Running int
Succeeded int
Failed int
Unknown int
}

View File

@@ -159,3 +159,86 @@ func (m *module) ListHosts(ctx context.Context, orgID valuer.UUID, req *inframon
return resp, nil
}
func (m *module) ListPods(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostablePods) (*inframonitoringtypes.Pods, error) {
if err := req.Validate(); err != nil {
return nil, err
}
resp := &inframonitoringtypes.Pods{}
if req.OrderBy == nil {
req.OrderBy = &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: inframonitoringtypes.PodsOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionDesc,
}
}
if len(req.GroupBy) == 0 {
req.GroupBy = []qbtypes.GroupByKey{podUIDGroupByKey}
resp.Type = inframonitoringtypes.ResponseTypeList
} else {
resp.Type = inframonitoringtypes.ResponseTypeGroupedList
}
missingMetrics, minFirstReportedUnixMilli, err := m.getMetricsExistenceAndEarliestTime(ctx, podsTableMetricNamesList)
if err != nil {
return nil, err
}
if len(missingMetrics) > 0 {
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: missingMetrics}
resp.Records = []inframonitoringtypes.PodRecord{}
resp.Total = 0
return resp, nil
}
if req.End < int64(minFirstReportedUnixMilli) {
resp.EndTimeBeforeRetention = true
resp.Records = []inframonitoringtypes.PodRecord{}
resp.Total = 0
return resp, nil
}
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: []string{}}
metadataMap, err := m.getPodsTableMetadata(ctx, req)
if err != nil {
return nil, err
}
resp.Total = len(metadataMap)
pageGroups, err := m.getTopPodGroups(ctx, orgID, req, metadataMap)
if err != nil {
return nil, err
}
if len(pageGroups) == 0 {
resp.Records = []inframonitoringtypes.PodRecord{}
return resp, nil
}
filterExpr := ""
if req.Filter != nil {
filterExpr = req.Filter.Expression
}
fullQueryReq := buildFullQueryRequest(req.Start, req.End, filterExpr, req.GroupBy, pageGroups, m.newPodsTableListQuery())
queryResp, err := m.querier.QueryRange(ctx, orgID, fullQueryReq)
if err != nil {
return nil, err
}
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req, pageGroups)
if err != nil {
return nil, err
}
isPodUIDInGroupBy := isKeyInGroupByAttrs(req.GroupBy, podUIDAttrKey)
resp.Records = buildPodRecords(isPodUIDInGroupBy, queryResp, pageGroups, req.GroupBy, metadataMap, phaseCounts, req.End)
resp.Warning = queryResp.Warning
return resp, nil
}

View File

@@ -0,0 +1,334 @@
package implinframonitoring
import (
"context"
"fmt"
"slices"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
)
// buildPodRecords assembles the page records. Phase counts come from
// phaseCounts in both modes. In list mode (isPodUIDInGroupBy=true) each
// group is one pod, so exactly one count is 1; PodPhase is derived from
// which one. In grouped_list mode PodPhase stays PodPhaseNone.
func buildPodRecords(
isPodUIDInGroupBy bool,
resp *qbtypes.QueryRangeResponse,
pageGroups []map[string]string,
groupBy []qbtypes.GroupByKey,
metadataMap map[string]map[string]string,
phaseCounts map[string]podPhaseCounts,
reqEnd int64,
) []inframonitoringtypes.PodRecord {
metricsMap := parseFullQueryResponse(resp, groupBy)
records := make([]inframonitoringtypes.PodRecord, 0, len(pageGroups))
for _, labels := range pageGroups {
compositeKey := compositeKeyFromLabels(labels, groupBy)
podUID := labels[podUIDAttrKey]
record := inframonitoringtypes.PodRecord{ // initialize with default values
PodUID: podUID,
PodPhase: inframonitoringtypes.PodPhaseNone,
PodCPU: -1,
PodCPURequest: -1,
PodCPULimit: -1,
PodMemory: -1,
PodMemoryRequest: -1,
PodMemoryLimit: -1,
PodAge: -1,
Meta: map[string]any{},
}
if metrics, ok := metricsMap[compositeKey]; ok {
if v, exists := metrics["A"]; exists {
record.PodCPU = v
}
if v, exists := metrics["B"]; exists {
record.PodCPURequest = v
}
if v, exists := metrics["C"]; exists {
record.PodCPULimit = v
}
if v, exists := metrics["D"]; exists {
record.PodMemory = v
}
if v, exists := metrics["E"]; exists {
record.PodMemoryRequest = v
}
if v, exists := metrics["F"]; exists {
record.PodMemoryLimit = v
}
}
if phaseCountsForGroup, ok := phaseCounts[compositeKey]; ok {
record.PendingPodCount = phaseCountsForGroup.Pending
record.RunningPodCount = phaseCountsForGroup.Running
record.SucceededPodCount = phaseCountsForGroup.Succeeded
record.FailedPodCount = phaseCountsForGroup.Failed
record.UnknownPodCount = phaseCountsForGroup.Unknown
// In list mode each group is one pod; the count==1 bucket identifies the phase.
if isPodUIDInGroupBy {
switch {
case phaseCountsForGroup.Pending == 1:
record.PodPhase = inframonitoringtypes.PodPhasePending
case phaseCountsForGroup.Running == 1:
record.PodPhase = inframonitoringtypes.PodPhaseRunning
case phaseCountsForGroup.Succeeded == 1:
record.PodPhase = inframonitoringtypes.PodPhaseSucceeded
case phaseCountsForGroup.Failed == 1:
record.PodPhase = inframonitoringtypes.PodPhaseFailed
case phaseCountsForGroup.Unknown == 1:
record.PodPhase = inframonitoringtypes.PodPhaseUnknown
}
}
}
if attrs, ok := metadataMap[compositeKey]; ok && isPodUIDInGroupBy {
// the condition above ensures we deduce age only if pod uid is in group by because if
// it's not in group by then we might have multiple pod uids in the same group and hence then podAge wont make sense
if startTimeStr, exists := attrs[podStartTimeAttrKey]; exists && startTimeStr != "" {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
startTimeMs := t.UnixMilli()
if startTimeMs > 0 {
record.PodAge = reqEnd - startTimeMs
}
}
}
for k, v := range attrs {
record.Meta[k] = v
}
}
records = append(records, record)
}
return records
}
func (m *module) getTopPodGroups(
ctx context.Context,
orgID valuer.UUID,
req *inframonitoringtypes.PostablePods,
metadataMap map[string]map[string]string,
) ([]map[string]string, error) {
orderByKey := req.OrderBy.Key.Name
queryNamesForOrderBy := orderByToPodsQueryNames[orderByKey]
rankingQueryName := queryNamesForOrderBy[len(queryNamesForOrderBy)-1]
topReq := &qbtypes.QueryRangeRequest{
Start: uint64(req.Start),
End: uint64(req.End),
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0, len(queryNamesForOrderBy)),
},
}
for _, envelope := range m.newPodsTableListQuery().CompositeQuery.Queries {
if !slices.Contains(queryNamesForOrderBy, envelope.GetQueryName()) {
continue
}
copied := envelope
if copied.Type == qbtypes.QueryTypeBuilder {
existingExpr := ""
if f := copied.GetFilter(); f != nil {
existingExpr = f.Expression
}
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
}
merged := mergeFilterExpressions(existingExpr, reqFilterExpr)
copied.SetFilter(&qbtypes.Filter{Expression: merged})
copied.SetGroupBy(req.GroupBy)
}
topReq.CompositeQuery.Queries = append(topReq.CompositeQuery.Queries, copied)
}
resp, err := m.querier.QueryRange(ctx, orgID, topReq)
if err != nil {
return nil, err
}
allMetricGroups := parseAndSortGroups(resp, rankingQueryName, req.GroupBy, req.OrderBy.Direction)
return paginateWithBackfill(allMetricGroups, metadataMap, req.GroupBy, req.Offset, req.Limit), nil
}
func (m *module) getPodsTableMetadata(ctx context.Context, req *inframonitoringtypes.PostablePods) (map[string]map[string]string, error) {
var nonGroupByAttrs []string
for _, key := range podAttrKeysForMetadata {
if !isKeyInGroupByAttrs(req.GroupBy, key) {
nonGroupByAttrs = append(nonGroupByAttrs, key)
}
}
return m.getMetadata(ctx, podsTableMetricNamesList, req.GroupBy, nonGroupByAttrs, req.Filter, req.Start, req.End)
}
// getPerGroupPodPhaseCounts computes per-group pod counts bucketed by each
// pod's latest phase in the requested window.
// Pipeline:
//
// timeSeriesFPs: fp ↔ (pod_uid, groupBy cols) from the time_series table.
// User filter + page-groups filter applied here.
// latestPhasePerPod: INNER JOIN samples × timeSeriesFPs, collapsed to
// the latest phase per pod via argMax(value, unix_milli).
// countPodsPerPhase: per-group uniqExactIf into 5 phase buckets.
//
// Groups absent from the result map have implicit zero counts (caller default).
func (m *module) getPerGroupPodPhaseCounts(
ctx context.Context,
req *inframonitoringtypes.PostablePods,
pageGroups []map[string]string,
) (map[string]podPhaseCounts, error) {
if len(pageGroups) == 0 || len(req.GroupBy) == 0 {
return map[string]podPhaseCounts{}, nil
}
// Merged filter expression (user filter + page-groups IN clauses).
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
}
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
// Resolve tables. Same convention as hosts (distributed names from helpers).
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
uint64(req.Start), uint64(req.End), nil,
)
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
uint64(req.Start), uint64(req.End),
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
)
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
// ----- timeSeriesFPs -----
timeSeriesFPs := sqlbuilder.NewSelectBuilder()
timeSeriesFPsSelectCols := []string{
"fingerprint",
fmt.Sprintf("JSONExtractString(labels, %s) AS pod_uid", timeSeriesFPs.Var(podUIDAttrKey)),
}
for _, key := range req.GroupBy {
timeSeriesFPsSelectCols = append(timeSeriesFPsSelectCols,
fmt.Sprintf("JSONExtractString(labels, %s) AS %s", timeSeriesFPs.Var(key.Name), quoteIdentifier(key.Name)),
)
}
timeSeriesFPs.Select(timeSeriesFPsSelectCols...)
timeSeriesFPs.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTimeSeriesTable))
timeSeriesFPs.Where(
timeSeriesFPs.E("metric_name", podPhaseMetricName),
timeSeriesFPs.GE("unix_milli", adjustedStart),
timeSeriesFPs.L("unix_milli", adjustedEnd),
)
if filterExpr != "" {
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: filterExpr}, req.Start, req.End)
if err != nil {
return nil, err
}
if filterClause != nil {
timeSeriesFPs.AddWhereClause(filterClause)
}
}
timeSeriesFPsGroupBy := []string{"fingerprint", "pod_uid"}
for _, key := range req.GroupBy {
timeSeriesFPsGroupBy = append(timeSeriesFPsGroupBy, quoteIdentifier(key.Name))
}
timeSeriesFPs.GroupBy(timeSeriesFPsGroupBy...)
timeSeriesFPsSQL, timeSeriesFPsArgs := timeSeriesFPs.BuildWithFlavor(sqlbuilder.ClickHouse)
latestPhasePerPod := sqlbuilder.NewSelectBuilder()
latestPhasePerPodSelectCols := []string{"tsfp.pod_uid AS pod_uid"}
latestPhasePerPodGroupBy := []string{"pod_uid"}
for _, key := range req.GroupBy {
col := quoteIdentifier(key.Name)
latestPhasePerPodSelectCols = append(latestPhasePerPodSelectCols, fmt.Sprintf("tsfp.%s AS %s", col, col))
latestPhasePerPodGroupBy = append(latestPhasePerPodGroupBy, col)
}
latestPhasePerPodSelectCols = append(latestPhasePerPodSelectCols,
fmt.Sprintf("argMax(samples.%s, samples.unix_milli) AS phase_value", valueCol),
)
latestPhasePerPod.Select(latestPhasePerPodSelectCols...)
latestPhasePerPod.From(fmt.Sprintf(
"%s.%s AS samples INNER JOIN time_series_fps AS tsfp ON samples.fingerprint = tsfp.fingerprint",
telemetrymetrics.DBName, samplesTable,
))
latestPhasePerPod.Where(
latestPhasePerPod.E("samples.metric_name", podPhaseMetricName),
latestPhasePerPod.GE("samples.unix_milli", req.Start),
latestPhasePerPod.L("samples.unix_milli", req.End),
"tsfp.pod_uid != ''",
)
latestPhasePerPod.GroupBy(latestPhasePerPodGroupBy...)
latestPhasePerPodSQL, latestPhasePerPodArgs := latestPhasePerPod.BuildWithFlavor(sqlbuilder.ClickHouse)
// ----- countPodsPerPhase (outer SELECT) -----
countPodsPerPhaseSelectCols := make([]string, 0, len(req.GroupBy)+5)
countPodsPerPhaseGroupBy := make([]string, 0, len(req.GroupBy))
for _, key := range req.GroupBy {
col := quoteIdentifier(key.Name)
countPodsPerPhaseSelectCols = append(countPodsPerPhaseSelectCols, col)
countPodsPerPhaseGroupBy = append(countPodsPerPhaseGroupBy, col)
}
countPodsPerPhaseSelectCols = append(countPodsPerPhaseSelectCols,
fmt.Sprintf("uniqExactIf(pod_uid, phase_value = %d) AS pending_count", inframonitoringtypes.PodPhaseNumPending),
fmt.Sprintf("uniqExactIf(pod_uid, phase_value = %d) AS running_count", inframonitoringtypes.PodPhaseNumRunning),
fmt.Sprintf("uniqExactIf(pod_uid, phase_value = %d) AS succeeded_count", inframonitoringtypes.PodPhaseNumSucceeded),
fmt.Sprintf("uniqExactIf(pod_uid, phase_value = %d) AS failed_count", inframonitoringtypes.PodPhaseNumFailed),
fmt.Sprintf("uniqExactIf(pod_uid, phase_value = %d) AS unknown_count", inframonitoringtypes.PodPhaseNumUnknown),
)
countPodsPerPhaseSQL := fmt.Sprintf(
"SELECT %s FROM latest_phase_per_pod GROUP BY %s",
strings.Join(countPodsPerPhaseSelectCols, ", "),
strings.Join(countPodsPerPhaseGroupBy, ", "),
)
// Combine CTEs + outer.
cteFragments := []string{
fmt.Sprintf("time_series_fps AS (%s)", timeSeriesFPsSQL),
fmt.Sprintf("latest_phase_per_pod AS (%s)", latestPhasePerPodSQL),
}
finalSQL := querybuilder.CombineCTEs(cteFragments) + countPodsPerPhaseSQL
finalArgs := querybuilder.PrependArgs([][]any{timeSeriesFPsArgs, latestPhasePerPodArgs}, nil)
rows, err := m.telemetryStore.ClickhouseDB().Query(ctx, finalSQL, finalArgs...)
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string]podPhaseCounts)
for rows.Next() {
groupVals := make([]string, len(req.GroupBy))
scanPtrs := make([]any, 0, len(req.GroupBy)+5)
for i := range groupVals {
scanPtrs = append(scanPtrs, &groupVals[i])
}
var pending, running, succeeded, failed, unknown uint64
scanPtrs = append(scanPtrs, &pending, &running, &succeeded, &failed, &unknown)
if err := rows.Scan(scanPtrs...); err != nil {
return nil, err
}
result[compositeKeyFromList(groupVals)] = podPhaseCounts{
Pending: int(pending),
Running: int(running),
Succeeded: int(succeeded),
Failed: int(failed),
Unknown: int(unknown),
}
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}

View File

@@ -0,0 +1,178 @@
package implinframonitoring
import (
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const (
podUIDAttrKey = "k8s.pod.uid"
podStartTimeAttrKey = "k8s.pod.start_time"
podPhaseMetricName = "k8s.pod.phase"
)
var podUIDGroupByKey = qbtypes.GroupByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: podUIDAttrKey,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
}
var podsTableMetricNamesList = []string{
"k8s.pod.cpu.usage",
"k8s.pod.cpu_request_utilization",
"k8s.pod.cpu_limit_utilization",
"k8s.pod.memory.working_set",
"k8s.pod.memory_request_utilization",
"k8s.pod.memory_limit_utilization",
"k8s.pod.phase",
}
var podAttrKeysForMetadata = []string{
"k8s.pod.uid",
"k8s.pod.name",
"k8s.namespace.name",
"k8s.node.name",
"k8s.deployment.name",
"k8s.statefulset.name",
"k8s.daemonset.name",
"k8s.job.name",
"k8s.cronjob.name",
"k8s.cluster.name",
"k8s.pod.start_time",
}
var orderByToPodsQueryNames = map[string][]string{
inframonitoringtypes.PodsOrderByCPU: {"A"},
inframonitoringtypes.PodsOrderByCPURequest: {"B"},
inframonitoringtypes.PodsOrderByCPULimit: {"C"},
inframonitoringtypes.PodsOrderByMemory: {"D"},
inframonitoringtypes.PodsOrderByMemoryRequest: {"E"},
inframonitoringtypes.PodsOrderByMemoryLimit: {"F"},
}
// newPodsTableListQuery builds the composite QB v5 request for the pods list.
// Pod phase is derived separately via getPerGroupPodPhaseCounts (works for both
// list and grouped_list modes), so no phase query is included here.
func (m *module) newPodsTableListQuery() *qbtypes.QueryRangeRequest {
queries := []qbtypes.QueryEnvelope{
// Query A: CPU usage
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.cpu.usage",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query B: CPU request utilization
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "B",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.cpu_request_utilization",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query C: CPU limit utilization
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "C",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.cpu_limit_utilization",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query D: Memory working set
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "D",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.memory.working_set",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query E: Memory request utilization
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "E",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.memory_request_utilization",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query F: Memory limit utilization
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "F",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.memory_limit_utilization",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
}
return &qbtypes.QueryRangeRequest{
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
}
}

View File

@@ -10,8 +10,10 @@ import (
type Handler interface {
ListHosts(http.ResponseWriter, *http.Request)
ListPods(http.ResponseWriter, *http.Request)
}
type Module interface {
ListHosts(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableHosts) (*inframonitoringtypes.Hosts, error)
ListPods(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostablePods) (*inframonitoringtypes.Pods, error)
}

View File

@@ -124,6 +124,17 @@ func CountExpressionForSamplesTable(tableName string) string {
return "sum(count)"
}
// ValueColumnForSamplesTable returns the column name holding the sample value:
// "last" for the 5m/30m aggregated tables, "value" otherwise.
// note all the other columns in the aggregated samples tables are nothing but aggregations.
// and so "last" is the value column for these tables.
func ValueColumnForSamplesTable(tableName string) string {
if tableName == SamplesV4Agg5mTableName || tableName == SamplesV4Agg30mTableName {
return "last"
}
return "value"
}
// start and end are in milliseconds
// we have three tables for samples
// 1. distributed_samples_v4

View File

@@ -49,7 +49,7 @@ type HostFilter struct {
FilterByStatus HostStatus `json:"filterByStatus"`
}
// Validate ensures HostsListRequest contains acceptable values.
// Validate ensures PostableHosts contains acceptable values.
func (req *PostableHosts) Validate() error {
if req == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")

View File

@@ -0,0 +1,109 @@
package inframonitoringtypes
import (
"encoding/json"
"slices"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type Pods struct {
Type ResponseType `json:"type" required:"true"`
Records []PodRecord `json:"records" required:"true"`
Total int `json:"total" required:"true"`
RequiredMetricsCheck RequiredMetricsCheck `json:"requiredMetricsCheck" required:"true"`
EndTimeBeforeRetention bool `json:"endTimeBeforeRetention" required:"true"`
Warning *qbtypes.QueryWarnData `json:"warning,omitempty"`
}
type PodRecord struct {
PodUID string `json:"podUID" required:"true"`
PodCPU float64 `json:"podCPU" required:"true"`
PodCPURequest float64 `json:"podCPURequest" required:"true"`
PodCPULimit float64 `json:"podCPULimit" required:"true"`
PodMemory float64 `json:"podMemory" required:"true"`
PodMemoryRequest float64 `json:"podMemoryRequest" required:"true"`
PodMemoryLimit float64 `json:"podMemoryLimit" required:"true"`
PodPhase PodPhase `json:"podPhase" required:"true"`
PendingPodCount int `json:"pendingPodCount" required:"true"`
RunningPodCount int `json:"runningPodCount" required:"true"`
SucceededPodCount int `json:"succeededPodCount" required:"true"`
FailedPodCount int `json:"failedPodCount" required:"true"`
UnknownPodCount int `json:"unknownPodCount" required:"true"`
PodAge int64 `json:"podAge" required:"true"`
Meta map[string]interface{} `json:"meta" required:"true"`
}
// PostablePods is the request body for the v2 pods list API.
type PostablePods struct {
Start int64 `json:"start" required:"true"`
End int64 `json:"end" required:"true"`
Filter *qbtypes.Filter `json:"filter"`
GroupBy []qbtypes.GroupByKey `json:"groupBy"`
OrderBy *qbtypes.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit" required:"true"`
}
// Validate ensures PostablePods contains acceptable values.
func (req *PostablePods) Validate() error {
if req == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
}
if req.Start <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid start time %d: start must be greater than 0",
req.Start,
)
}
if req.End <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid end time %d: end must be greater than 0",
req.End,
)
}
if req.Start >= req.End {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid time range: start (%d) must be less than end (%d)",
req.Start,
req.End,
)
}
if req.Limit < 1 || req.Limit > 5000 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
}
if req.Offset < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "offset cannot be negative")
}
if req.OrderBy != nil {
if !slices.Contains(PodsValidOrderByKeys, req.OrderBy.Key.Name) {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by key: %s", req.OrderBy.Key.Name)
}
if req.OrderBy.Direction != qbtypes.OrderDirectionAsc && req.OrderBy.Direction != qbtypes.OrderDirectionDesc {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by direction: %s", req.OrderBy.Direction)
}
}
return nil
}
// UnmarshalJSON validates input immediately after decoding.
func (req *PostablePods) UnmarshalJSON(data []byte) error {
type raw PostablePods
var decoded raw
if err := json.Unmarshal(data, &decoded); err != nil {
return err
}
*req = PostablePods(decoded)
return req.Validate()
}

View File

@@ -0,0 +1,55 @@
package inframonitoringtypes
import "github.com/SigNoz/signoz/pkg/valuer"
type PodPhase struct {
valuer.String
}
var (
PodPhasePending = PodPhase{valuer.NewString("pending")}
PodPhaseRunning = PodPhase{valuer.NewString("running")}
PodPhaseSucceeded = PodPhase{valuer.NewString("succeeded")}
PodPhaseFailed = PodPhase{valuer.NewString("failed")}
PodPhaseUnknown = PodPhase{valuer.NewString("unknown")}
PodPhaseNone = PodPhase{valuer.NewString("")}
)
func (PodPhase) Enum() []any {
return []any{
PodPhasePending,
PodPhaseRunning,
PodPhaseSucceeded,
PodPhaseFailed,
PodPhaseUnknown,
PodPhaseNone,
}
}
// Numeric pod phase values emitted by the k8s.pod.phase metric
// (source: OTel kubeletstats receiver).
const (
PodPhaseNumPending = 1
PodPhaseNumRunning = 2
PodPhaseNumSucceeded = 3
PodPhaseNumFailed = 4
PodPhaseNumUnknown = 5
)
const (
PodsOrderByCPU = "cpu"
PodsOrderByCPURequest = "cpu_request"
PodsOrderByCPULimit = "cpu_limit"
PodsOrderByMemory = "memory"
PodsOrderByMemoryRequest = "memory_request"
PodsOrderByMemoryLimit = "memory_limit"
)
var PodsValidOrderByKeys = []string{
PodsOrderByCPU,
PodsOrderByCPURequest,
PodsOrderByCPULimit,
PodsOrderByMemory,
PodsOrderByMemoryRequest,
PodsOrderByMemoryLimit,
}

View File

@@ -0,0 +1,219 @@
package inframonitoringtypes
import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestPostablePods_Validate(t *testing.T) {
tests := []struct {
name string
req *PostablePods
wantErr bool
}{
{
name: "valid request",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "nil request",
req: nil,
wantErr: true,
},
{
name: "start time zero",
req: &PostablePods{
Start: 0,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time negative",
req: &PostablePods{
Start: -1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "end time zero",
req: &PostablePods{
Start: 1000,
End: 0,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time greater than end time",
req: &PostablePods{
Start: 2000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time equal to end time",
req: &PostablePods{
Start: 1000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "limit zero",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 0,
Offset: 0,
},
wantErr: true,
},
{
name: "limit negative",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: -10,
Offset: 0,
},
wantErr: true,
},
{
name: "limit exceeds max",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 5001,
Offset: 0,
},
wantErr: true,
},
{
name: "offset negative",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: -5,
},
wantErr: true,
},
{
name: "orderBy nil is valid",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "orderBy with valid key cpu and direction asc",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: PodsOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionAsc,
},
},
wantErr: false,
},
{
name: "orderBy with phase key is rejected",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "phase",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with invalid key",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "unknown",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with valid key but invalid direction",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: PodsOrderByMemory,
},
},
Direction: qbtypes.OrderDirection{String: valuer.NewString("invalid")},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.req.Validate()
if tt.wantErr {
require.Error(t, err)
require.True(t, errors.Ast(err, errors.TypeInvalidInput), "expected error to be of type InvalidInput")
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -11,9 +11,7 @@ import (
"github.com/uptrace/bun"
)
var (
ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
)
var ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
type MaintenanceStatus struct {
valuer.String
@@ -63,15 +61,15 @@ type StorablePlannedMaintenance struct {
}
type PlannedMaintenance struct {
ID valuer.UUID `json:"id" required:"true"`
Name string `json:"name" required:"true"`
Description string `json:"description"`
Schedule *Schedule `json:"schedule" required:"true"`
RuleIDs []string `json:"alertIds"`
CreatedAt time.Time `json:"createdAt"`
CreatedBy string `json:"createdBy"`
UpdatedAt time.Time `json:"updatedAt"`
UpdatedBy string `json:"updatedBy"`
ID valuer.UUID `json:"id" required:"true"`
Name string `json:"name" required:"true"`
Description string `json:"description"`
Schedule *Schedule `json:"schedule" required:"true"`
RuleIDs []string `json:"alertIds"`
CreatedAt time.Time `json:"createdAt"`
CreatedBy string `json:"createdBy"`
UpdatedAt time.Time `json:"updatedAt"`
UpdatedBy string `json:"updatedBy"`
Status MaintenanceStatus `json:"status" required:"true"`
Kind MaintenanceKind `json:"kind" required:"true"`
}
@@ -161,8 +159,11 @@ func (m *PlannedMaintenance) ShouldSkip(ruleID string, now time.Time) bool {
currentTime := now.In(loc)
// fixed schedule
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
// fixed schedule — only when no recurrence is configured.
// When recurrence is set, the recurring check below handles everything;
// falling through here would cause the window to match the absolute
// StartTimeEndTime range instead of the daily/weekly/monthly pattern.
if m.Schedule.Recurrence == nil && !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
startTime := m.Schedule.StartTime.In(loc)
endTime := m.Schedule.EndTime.In(loc)
if currentTime.Equal(startTime) || currentTime.Equal(endTime) ||

View File

@@ -13,7 +13,6 @@ func timePtr(t time.Time) *time.Time {
}
func TestShouldSkipMaintenance(t *testing.T) {
cases := []struct {
name string
maintenance *PlannedMaintenance
@@ -499,7 +498,7 @@ func TestShouldSkipMaintenance(t *testing.T) {
},
},
},
ts: time.Date(2024, 04, 1, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 4, 1, 12, 10, 0, 0, time.UTC),
skip: true,
},
{
@@ -508,14 +507,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 15, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 4, 15, 12, 10, 0, 0, time.UTC),
skip: true,
},
{
@@ -524,14 +523,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
ts: time.Date(2024, 4, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
skip: false,
},
{
@@ -540,14 +539,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
ts: time.Date(2024, 4, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
skip: false,
},
{
@@ -556,14 +555,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 05, 06, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 5, 6, 12, 10, 0, 0, time.UTC),
skip: true,
},
{
@@ -572,14 +571,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 05, 06, 14, 00, 0, 0, time.UTC),
ts: time.Date(2024, 5, 6, 14, 0, 0, 0, time.UTC),
skip: true,
},
{
@@ -588,13 +587,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 04, 04, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 4, 4, 12, 10, 0, 0, time.UTC),
skip: true,
},
{
@@ -603,13 +602,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 04, 04, 14, 10, 0, 0, time.UTC),
ts: time.Date(2024, 4, 4, 14, 10, 0, 0, time.UTC),
skip: false,
},
{
@@ -618,13 +617,52 @@ func TestShouldSkipMaintenance(t *testing.T) {
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 05, 04, 12, 10, 0, 0, time.UTC),
ts: time.Date(2024, 5, 4, 12, 10, 0, 0, time.UTC),
skip: true,
},
// The recurrence should govern, when set. Not the fixed range.
{
name: "recurring-daily-with-fixed-times-outside-daily-window",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
// These fixed fields should be ignored when Recurrence is set.
StartTime: time.Date(2026, 4, 1, 10, 0, 0, 0, time.UTC),
EndTime: time.Date(2026, 4, 30, 18, 0, 0, 0, time.UTC),
Recurrence: &Recurrence{
StartTime: time.Date(2026, 4, 1, 14, 0, 0, 0, time.UTC), // daily at 14:00
Duration: valuer.MustParseTextDuration("2h"), // until 16:00
RepeatType: RepeatTypeDaily,
},
},
},
// 11:00 is inside the fixed range but outside the daily 14:00-16:00 window.
// Before the fix this returned true (bug); after fix it returns false.
ts: time.Date(2026, 4, 15, 11, 0, 0, 0, time.UTC),
skip: false,
},
{
name: "recurring-daily-with-fixed-times-inside-daily-window",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
StartTime: time.Date(2026, 4, 1, 10, 0, 0, 0, time.UTC),
EndTime: time.Date(2026, 4, 30, 18, 0, 0, 0, time.UTC),
Recurrence: &Recurrence{
StartTime: time.Date(2026, 4, 1, 14, 0, 0, 0, time.UTC),
Duration: valuer.MustParseTextDuration("2h"),
RepeatType: RepeatTypeDaily,
},
},
},
// 15:00 is inside the daily 14:00-16:00 window — should skip.
ts: time.Date(2026, 4, 15, 15, 0, 0, 0, time.UTC),
skip: true,
},
}

View File

@@ -23,6 +23,7 @@ pytest_plugins = [
"fixtures.notification_channel",
"fixtures.alerts",
"fixtures.cloudintegrations",
"fixtures.jsontypes",
"fixtures.seeder",
]
@@ -79,6 +80,6 @@ def pytest_addoption(parser: pytest.Parser):
parser.addoption(
"--schema-migrator-version",
action="store",
default="v0.144.2",
default="v0.144.3",
help="schema migrator version",
)

View File

@@ -1,5 +1,6 @@
import os
from collections.abc import Generator
from collections.abc import Callable, Generator
from datetime import datetime
from typing import Any
import clickhouse_connect
@@ -262,3 +263,76 @@ def clickhouse(
delete=delete,
restore=restore,
)
@pytest.fixture(name="check_query_log")
def check_query_log(
signoz: types.SigNoz,
) -> Callable[..., None]:
"""
Returns a callable that flushes system.query_log and asserts that at
least one recent SELECT satisfies check_fn.
Args:
after_ts: Only consider queries logged after this timestamp.
case_name: Label used in assertion failure messages.
check_fn: Predicate run against each candidate query string.
tables: Filter to queries that touched all of these tables, as
'db.table' strings (uses hasAll(tables, [...])).
must_contain: Substrings that must appear in the query text (AND-ed).
must_not_contain: Substrings that must not appear in the query text (AND-ed).
limit: How many most-recent queries to examine (default 10).
Usage:
before = datetime.now(tz=timezone.utc)
# ... trigger the query under test ...
check_query_log(
before, "my.case",
lambda q: "assumeNotNull" in q,
tables=["signoz_logs.distributed_logs_v2"],
)
"""
def _check(
after_ts: datetime,
case_name: str,
check_fn: Callable[[str], bool],
*,
tables: list[str] | None = None,
must_contain: list[str] | None = None,
must_not_contain: list[str] | None = None,
limit: int = 10,
) -> None:
conn = signoz.telemetrystore.conn
conn.command("SYSTEM FLUSH LOGS")
# Use millisecond precision to avoid timestamp collisions between
# adjacent test cases (second-level precision causes bleed-through).
params: dict = {"after_ms": int(after_ts.timestamp() * 1000)}
conditions = [
"type = 'QueryFinish'",
"query_kind = 'Select'",
"toUnixTimestamp64Milli(event_time_microseconds) >= %(after_ms)s",
]
if tables:
params["tables"] = tables
conditions.append("hasAll(tables, %(tables)s)")
for i, pattern in enumerate(must_contain or []):
key = f"mc_{i}"
params[key] = pattern
conditions.append(f"position(query, %({key})s) > 0")
for i, pattern in enumerate(must_not_contain or []):
key = f"mnc_{i}"
params[key] = pattern
conditions.append(f"position(query, %({key})s) = 0")
where = " AND ".join(conditions)
result = conn.query(
f"SELECT query FROM system.query_log WHERE {where} ORDER BY event_time_microseconds DESC LIMIT {limit}",
parameters=params,
)
queries = [row[0] for row in result.result_rows]
assert queries, f"No matching SELECT in system.query_log for case '{case_name}'"
assert all(check_fn(q) for q in queries), f"query_log check failed for case '{case_name}'.\n" + "Queries:\n" + "\n---\n".join(queries)
return _check

441
tests/fixtures/jsontypes.py vendored Normal file
View File

@@ -0,0 +1,441 @@
"""
Simpler version of metadataexporter for exporting jsontypes for test fixtures.
This exports JSON type metadata to the path_types table by parsing JSON bodies
and extracting all paths with their types, similar to how the real metadataexporter works.
"""
import datetime
import json
from abc import ABC
from collections.abc import Callable, Generator
from http import HTTPStatus
from typing import (
Any,
)
import numpy as np
import pytest
import requests
from fixtures import types
class JSONPathType(ABC):
"""Represents a JSON path with its type information"""
field_name: str
field_data_type: str
last_seen: np.uint64
signal: str = "logs"
field_context: str = "body"
def __init__(
self,
field_name: str,
field_data_type: str,
last_seen: datetime.datetime | None = None,
) -> None:
self.field_name = field_name
self.field_data_type = field_data_type
self.signal = "logs"
self.field_context = "body"
if last_seen is None:
last_seen = datetime.datetime.now()
self.last_seen = np.uint64(int(last_seen.timestamp() * 1e9))
def np_arr(self) -> np.array:
"""Return path type data as numpy array for database insertion"""
return np.array([self.signal, self.field_context, self.field_name, self.field_data_type, self.last_seen])
# Constants matching metadataexporter
ARRAY_SEPARATOR = "[]." # Used in paths like "education[].name"
ARRAY_SUFFIX = "[]" # Used when traversing into array element objects
def _infer_array_type_from_type_strings(types: list[str]) -> str | None:
"""
Infer array type from a list of pre-classified type strings.
Matches metadataexporter's inferArrayMask logic.
Internal type strings are: "JSON", "String", "Bool", "Float64", "Int64"
SuperTyping rules (matching Go inferArrayMask):
- JSON alone → []json
- JSON + any primitive → []dynamic
- String alone → []string; String + other → []dynamic
- Float64 wins over Int64 and Bool
- Int64 wins over Bool
- Bool alone → []bool
"""
if len(types) == 0:
return None
unique = set(types)
has_json = "JSON" in unique
# hasPrimitive mirrors Go: (hasJSON && len(unique) > 1) || (!hasJSON && len(unique) > 0)
has_primitive = (has_json and len(unique) > 1) or (not has_json and len(unique) > 0)
if has_json:
if not has_primitive:
return "[]json"
return "[]dynamic"
# ---- Primitive Type Resolution (Float > Int > Bool) ----
if "String" in unique:
if len(unique) > 1:
return "[]dynamic"
return "[]string"
if "Float64" in unique:
return "[]float64"
if "Int64" in unique:
return "[]int64"
if "Bool" in unique:
return "[]bool"
return "[]dynamic"
def _infer_array_type(elements: list[Any]) -> str | None:
"""
Infer array type from raw Python list elements.
Classifies each element then delegates to _infer_array_type_from_type_strings.
"""
if len(elements) == 0:
return None
types = []
for elem in elements:
if elem is None:
continue
if isinstance(elem, dict):
types.append("JSON")
elif isinstance(elem, str):
types.append("String")
elif isinstance(elem, bool): # must be before int (bool is subclass of int)
types.append("Bool")
elif isinstance(elem, float):
types.append("Float64")
elif isinstance(elem, int):
types.append("Int64")
return _infer_array_type_from_type_strings(types)
def _python_type_to_clickhouse_type(value: Any) -> str:
"""
Convert Python type to ClickHouse JSON type string.
Maps Python types to ClickHouse JSON data types.
Matches metadataexporter's mapPCommonValueTypeToDataType.
"""
if isinstance(value, bool):
return "bool"
elif isinstance(value, int):
return "int64"
elif isinstance(value, float):
return "float64"
elif isinstance(value, str):
return "string"
elif isinstance(value, list):
# Use the sophisticated array type inference
array_type = _infer_array_type(value)
return array_type if array_type else "[]dynamic"
elif isinstance(value, dict):
return "json"
else:
return "string" # Default fallback
def _extract_json_paths(
obj: Any,
current_path: str = "",
path_types: dict[str, set[str]] | None = None,
level: int = 0,
) -> dict[str, set[str]]:
"""
Recursively extract all paths and their types from a JSON object.
Matches metadataexporter's analyzePValue logic.
Args:
obj: The JSON object to traverse
current_path: Current path being built (e.g., "user.name")
path_types: Dictionary mapping paths to sets of types found
level: Current nesting level (for depth limiting)
Returns:
Dictionary mapping paths to sets of type strings
"""
if path_types is None:
path_types = {}
if obj is None:
# Skip null values — matches Go walkNode which errors on ValueTypeEmpty
return path_types
if isinstance(obj, dict):
# For objects, recurse into keys without recording the object itself as a type.
# Matches Go walkMap which recurses without calling ta.record on the map node.
for key, value in obj.items():
# Build the path for this key
if current_path:
new_path = f"{current_path}.{key}"
else:
new_path = key
# Recurse into the value
_extract_json_paths(value, new_path, path_types, level + 1)
elif isinstance(obj, list):
# Skip empty arrays
if len(obj) == 0:
return path_types
# Collect types from array elements (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
types = []
for item in obj:
if isinstance(item, dict):
# When traversing into array element objects, use ArraySuffix ([])
# This matches: prefix+ArraySuffix in the Go code
# Example: if current_path is "education", we use "education[]" to traverse into objects
array_prefix = current_path + ARRAY_SUFFIX if current_path else ""
for key, value in item.items():
if array_prefix:
# Use array separator: education[].name
array_path = f"{array_prefix}.{key}"
else:
array_path = key
# Recurse without increasing level (matching Go behavior)
_extract_json_paths(value, array_path, path_types, level)
types.append("JSON")
elif isinstance(item, list):
# Arrays inside arrays are not supported - skip the whole path
# Matching Go: e.logger.Error("arrays inside arrays are not supported!", ...); return nil
return path_types
elif isinstance(item, str):
types.append("String")
elif isinstance(item, bool):
types.append("Bool")
elif isinstance(item, float):
types.append("Float64")
elif isinstance(item, int):
types.append("Int64")
# Infer array type from collected types (matching Go: if mask := inferArrayMask(types); mask != 0)
if len(types) > 0:
array_type = _infer_array_type_from_type_strings(types)
if array_type and current_path:
if current_path not in path_types:
path_types[current_path] = set()
path_types[current_path].add(array_type)
# Primitive value (string, number, bool)
elif current_path:
if current_path not in path_types:
path_types[current_path] = set()
obj_type = _python_type_to_clickhouse_type(obj)
path_types[current_path].add(obj_type)
return path_types
def _parse_json_bodies_and_extract_paths(
json_bodies: list[str],
timestamp: datetime.datetime | None = None,
) -> list[JSONPathType]:
"""
Parse JSON bodies and extract all paths with their types.
This mimics the behavior of metadataexporter.
Args:
json_bodies: List of JSON body strings to parse
timestamp: Timestamp to use for last_seen (defaults to now)
Returns:
List of JSONPathType objects with all discovered paths and types
"""
if timestamp is None:
timestamp = datetime.datetime.now()
# Aggregate all paths and their types across all JSON bodies
all_path_types: dict[str, set[str]] = {}
for json_body in json_bodies:
try:
parsed = json.loads(json_body)
_extract_json_paths(parsed, "", all_path_types, level=0)
except (json.JSONDecodeError, TypeError):
# Skip invalid JSON
continue
# Convert to list of JSONPathType objects
# Each path can have multiple types, so we create one JSONPathType per type
path_type_objects: list[JSONPathType] = []
for path, types_set in all_path_types.items():
for type_str in types_set:
path_type_objects.append(JSONPathType(field_name=path, field_data_type=type_str, last_seen=timestamp))
return path_type_objects
@pytest.fixture(name="export_json_types", scope="function")
def export_json_types(
clickhouse: types.TestContainerClickhouse,
) -> Generator[Callable[[list[JSONPathType] | list[str] | list[Any]], None], Any]:
"""
Fixture for exporting JSON type metadata to the path_types table.
This is a simpler version of metadataexporter for test fixtures.
The function can accept:
1. List of JSONPathType objects (manual specification)
2. List of JSON body strings (auto-extract paths)
3. List of Logs objects (extract from body_json field)
Usage examples:
# Manual specification
export_json_types([
JSONPathType(field_name="user.name", field_data_type="string"),
JSONPathType(field_name="user.age", field_data_type="int64"),
])
# Auto-extract from JSON strings
export_json_types([
'{"user": {"name": "alice", "age": 25}}',
'{"user": {"name": "bob", "age": 30}}',
])
# Auto-extract from Logs objects
export_json_types(logs_list)
"""
def _export_json_types(
data: list[JSONPathType] | list[str] | list[Any], # List[Logs] but avoiding circular import
) -> None:
"""
Export JSON type metadata to signoz_metadata.distributed_field_keys table.
This table stores signal, context, path, and type information for body JSON fields.
"""
path_types: list[JSONPathType] = []
if len(data) == 0:
return
# Determine input type and convert to JSONPathType list
first_item = data[0]
if isinstance(first_item, JSONPathType):
# Already JSONPathType objects
path_types = data # type: ignore
elif isinstance(first_item, str):
# List of JSON strings - parse and extract paths
path_types = _parse_json_bodies_and_extract_paths(data) # type: ignore
else:
# Assume it's a list of Logs objects - extract body_v2
json_bodies: list[str] = []
for log in data: # type: ignore
# Try to get body_v2 attribute
if hasattr(log, "body_v2") and log.body_v2:
json_bodies.append(log.body_v2)
elif hasattr(log, "body") and log.body:
# Fallback to body if body_v2 not available
try:
# Try to parse as JSON
json.loads(log.body)
json_bodies.append(log.body)
except (json.JSONDecodeError, TypeError):
pass
if json_bodies:
path_types = _parse_json_bodies_and_extract_paths(json_bodies)
if len(path_types) == 0:
return
clickhouse.conn.insert(
database="signoz_metadata",
table="distributed_field_keys",
data=[path_type.np_arr() for path_type in path_types],
column_names=[
"signal",
"field_context",
"field_name",
"field_data_type",
"last_seen",
],
)
yield _export_json_types
# Cleanup - truncate the local table after tests (following pattern from logs fixture)
clickhouse.conn.query(f"TRUNCATE TABLE signoz_metadata.field_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC")
@pytest.fixture(name="create_json_index", scope="function")
def create_json_index(
signoz: types.SigNoz,
) -> Generator[Callable[[str, list[dict[str, Any]]], None]]:
"""
Create ClickHouse data-skipping indexes on body_v2 JSON sub-columns via
POST /api/v1/logs/promote_paths.
**Must be called BEFORE insert_logs** so that newly inserted data parts are
covered by the index and the QB uses the indexed condition path.
Each entry in `paths` follows the PromotePath API shape:
{
"path": "body.user.name", # must start with "body."
"indexes": [
{
"fieldDataType": "string", # string | int64 | float64
"type": "ngrambf_v1(3, 256, 2, 0)", # or "minmax", "tokenbf_v1(...)"
"granularity": 1,
}
],
}
Teardown drops every index created during the test by querying
system.data_skipping_indices for matching expressions.
Example::
def test_foo(signoz, get_token, insert_logs, export_json_types, create_json_body_index):
token = get_token(...)
export_json_types(logs_list)
create_json_body_index(token, [
{"path": "body.user.name",
"indexes": [{"fieldDataType": "string", "type": "ngrambf_v1(3, 256, 2, 0)", "granularity": 1}]},
{"path": "body.user.age",
"indexes": [{"fieldDataType": "int64", "type": "minmax", "granularity": 1}]},
])
insert_logs(logs_list) # data inserted after index exists — index is built automatically
"""
created_paths: list[str] = []
def _create_json_body_index(token: str, paths: list[dict[str, Any]]) -> None:
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/logs/promote_paths"),
headers={"authorization": f"Bearer {token}"},
json=paths,
timeout=30,
)
assert response.status_code == HTTPStatus.CREATED, f"Failed to create JSON body indexes: {response.status_code} {response.text}"
for path in paths:
# The API strips the "body." prefix before storing — mirror that here
# so our cleanup query uses the bare path (e.g. "user.name").
raw = path["path"].removeprefix("body.")
if raw not in created_paths:
created_paths.append(raw)
yield _create_json_body_index
if not created_paths:
return
cluster = signoz.telemetrystore.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER"]
for path in created_paths:
result = signoz.telemetrystore.conn.query(f"SELECT name FROM system.data_skipping_indices WHERE database = 'signoz_logs' AND table = 'logs_v2' AND expr LIKE '%{path}%'")
for (index_name,) in result.result_rows:
signoz.telemetrystore.conn.query(f"ALTER TABLE signoz_logs.logs_v2 ON CLUSTER '{cluster}' DROP INDEX IF EXISTS `{index_name}`")

View File

@@ -121,6 +121,8 @@ class Logs(ABC):
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "default body",
body_v2: str | None = None,
body_promoted: str | None = None,
severity_text: str = "INFO",
trace_id: str = "",
span_id: str = "",
@@ -166,6 +168,33 @@ class Logs(ABC):
# Set body
self.body = body
# Set body_v2 - if body is JSON, parse and stringify it, otherwise use empty string
# ClickHouse accepts String input for JSON column
if body_v2 is not None:
self.body_v2 = body_v2
else:
# Try to parse body as JSON; if successful use it directly,
# otherwise wrap as {"message": body} matching the normalize operator behavior.
try:
json.loads(body)
self.body_v2 = body
except (json.JSONDecodeError, TypeError):
self.body_v2 = json.dumps({"message": body})
# Set body_promoted - must be valid JSON
# Tests will explicitly pass promoted column's content, but we validate it
if body_promoted is not None:
# Validate that it's valid JSON
try:
json.loads(body_promoted)
self.body_promoted = body_promoted
except (json.JSONDecodeError, TypeError):
# If invalid, default to empty JSON object
self.body_promoted = "{}"
else:
# Default to empty JSON object (valid JSON)
self.body_promoted = "{}"
# Process resources and attributes
self.resources_string = {k: str(v) for k, v in resources.items()}
self.resource_json = {} if resource_write_mode == "legacy_only" else dict(self.resources_string)
@@ -309,6 +338,8 @@ class Logs(ABC):
self.severity_text,
self.severity_number,
self.body,
self.body_v2,
self.body_promoted,
self.attributes_string,
self.attributes_number,
self.attributes_bool,
@@ -436,31 +467,47 @@ def insert_logs_to_clickhouse(conn, logs: list[Logs]) -> None:
data=[resource_key.np_arr() for resource_key in resource_keys],
)
all_column_names = [
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"body_v2",
"body_promoted",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
]
result = conn.query("SELECT count() FROM system.columns WHERE database = 'signoz_logs' AND table = 'logs_v2' AND name = 'body_v2'")
has_json_body = result.result_rows[0][0] > 0
if has_json_body:
column_names = all_column_names
data = [log.np_arr() for log in logs]
else:
json_body_cols = {"body_v2", "body_promoted"}
keep_indices = [i for i, c in enumerate(all_column_names) if c not in json_body_cols]
column_names = [all_column_names[i] for i in keep_indices]
data = [log.np_arr()[keep_indices] for log in logs]
conn.insert(
database="signoz_logs",
table="distributed_logs_v2",
data=[log.np_arr() for log in logs],
column_names=[
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
],
data=data,
column_names=column_names,
)

View File

@@ -8,27 +8,32 @@ from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@pytest.fixture(name="migrator", scope="package")
def migrator(
def create_migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
cache_key: str = "migrator",
env_overrides: dict | None = None,
) -> types.Operation:
"""
Package-scoped fixture for running schema migrations.
Factory function for running schema migrations.
Accepts optional env_overrides to customize the migrator environment.
"""
def create() -> None:
version = request.config.getoption("--schema-migrator-version")
client = docker.from_env()
environment = dict(env_overrides) if env_overrides else {}
container = client.containers.run(
image=f"signoz/signoz-schema-migrator:{version}",
command=f"sync --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN']}",
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -47,6 +52,7 @@ def migrator(
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -59,7 +65,7 @@ def migrator(
container.remove()
return types.Operation(name="migrator")
return types.Operation(name=cache_key)
def delete(_: types.Operation) -> None:
pass
@@ -70,9 +76,27 @@ def migrator(
return reuse.wrap(
request,
pytestconfig,
"migrator",
cache_key,
lambda: types.Operation(name=""),
create,
delete,
restore,
)
@pytest.fixture(name="migrator", scope="package")
def migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped fixture for running schema migrations.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
)

View File

@@ -500,6 +500,14 @@ def get_scalar_columns(response_json: dict) -> list[dict]:
return results[0].get("columns", [])
def get_rows(response: requests.Response) -> list[dict[str, Any]]:
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
# The server returns rows:null (not []) when there are 0 matching logs.
return results[0].get("rows") or []
def get_column_data_from_response(response_json: dict, column_name: str) -> list[Any]:
results = response_json.get("data", {}).get("data", {}).get("results", [])
if not results:

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,67 @@
import pytest
from testcontainers.core.container import Network
from fixtures import types
from fixtures.migrator import create_migrator
from fixtures.signoz import create_signoz
UNSUPPORTED_CLICKHOUSE_VERSIONS = {"25.5.6"}
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
version = config.getoption("--clickhouse-version")
if version in UNSUPPORTED_CLICKHOUSE_VERSIONS:
skip = pytest.mark.skip(reason=f"JSON body QB tests require ClickHouse > {version}")
for item in items:
item.add_marker(skip)
@pytest.fixture(name="migrator", scope="package")
def migrator_json(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped migrator with ENABLE_LOGS_MIGRATIONS_V2=1.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="migrator-json-body",
env_overrides={
"ENABLE_LOGS_MIGRATIONS_V2": "1",
},
)
@pytest.fixture(name="signoz", scope="package")
def signoz_json_body(
network: Network,
migrator: types.Operation, # pylint: disable=unused-argument
zeus: types.TestContainerDocker,
gateway: types.TestContainerDocker,
sqlstore: types.TestContainerSQL,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.SigNoz:
"""
Package-scoped fixture for SigNoz with BODY_JSON_QUERY_ENABLED=true.
"""
return create_signoz(
network=network,
zeus=zeus,
gateway=gateway,
sqlstore=sqlstore,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="signoz-json-body",
env_overrides={
"BODY_JSON_QUERY_ENABLED": "true",
},
)