Compare commits

..

8 Commits

Author SHA1 Message Date
Piyush Singariya
9689408a99 Merge branch 'main' into json-feature-flag 2026-04-23 11:21:14 +05:30
Piyush Singariya
2d918c95e0 chore: rename field 2026-04-23 11:20:36 +05:30
Piyush Singariya
efb415960c fix: minor changes 2026-04-23 11:15:12 +05:30
Piyush Singariya
412b497bdf test: removed nil checks 2026-04-23 11:10:34 +05:30
Piyush Singariya
67d95e1d1e fix: flagger threaded into tests 2026-04-23 10:47:00 +05:30
Piyush Singariya
57cccba19f feat: flagger integration in flow 2026-04-22 20:35:52 +05:30
Piyush Singariya
c5c2525ff0 fix: still using global bool 2026-04-22 15:14:56 +05:30
Piyush Singariya
1e4f865a15 chore: add json enabled as feature flag for FE 2026-04-22 09:58:24 +05:30
76 changed files with 438 additions and 4367 deletions

View File

@@ -2287,234 +2287,6 @@ components:
enabled:
type: boolean
type: object
InframonitoringtypesHostFilter:
properties:
expression:
type: string
filterByStatus:
$ref: '#/components/schemas/InframonitoringtypesHostStatus'
type: object
InframonitoringtypesHostRecord:
properties:
activeHostCount:
type: integer
cpu:
format: double
type: number
diskUsage:
format: double
type: number
hostName:
type: string
inactiveHostCount:
type: integer
load15:
format: double
type: number
memory:
format: double
type: number
meta:
additionalProperties: {}
nullable: true
type: object
status:
$ref: '#/components/schemas/InframonitoringtypesHostStatus'
wait:
format: double
type: number
required:
- hostName
- status
- activeHostCount
- inactiveHostCount
- cpu
- memory
- wait
- load15
- diskUsage
- meta
type: object
InframonitoringtypesHostStatus:
enum:
- active
- inactive
- ""
type: string
InframonitoringtypesHosts:
properties:
endTimeBeforeRetention:
type: boolean
records:
items:
$ref: '#/components/schemas/InframonitoringtypesHostRecord'
nullable: true
type: array
requiredMetricsCheck:
$ref: '#/components/schemas/InframonitoringtypesRequiredMetricsCheck'
total:
type: integer
type:
$ref: '#/components/schemas/InframonitoringtypesResponseType'
warning:
$ref: '#/components/schemas/Querybuildertypesv5QueryWarnData'
required:
- type
- records
- total
- requiredMetricsCheck
- endTimeBeforeRetention
type: object
InframonitoringtypesPodPhase:
enum:
- pending
- running
- succeeded
- failed
- ""
type: string
InframonitoringtypesPodRecord:
properties:
failedPodCount:
type: integer
meta:
additionalProperties: {}
nullable: true
type: object
pendingPodCount:
type: integer
podAge:
format: int64
type: integer
podCPU:
format: double
type: number
podCPULimit:
format: double
type: number
podCPURequest:
format: double
type: number
podMemory:
format: double
type: number
podMemoryLimit:
format: double
type: number
podMemoryRequest:
format: double
type: number
podPhase:
$ref: '#/components/schemas/InframonitoringtypesPodPhase'
podUID:
type: string
runningPodCount:
type: integer
succeededPodCount:
type: integer
required:
- podUID
- podCPU
- podCPURequest
- podCPULimit
- podMemory
- podMemoryRequest
- podMemoryLimit
- podPhase
- pendingPodCount
- runningPodCount
- succeededPodCount
- failedPodCount
- podAge
- meta
type: object
InframonitoringtypesPods:
properties:
endTimeBeforeRetention:
type: boolean
records:
items:
$ref: '#/components/schemas/InframonitoringtypesPodRecord'
nullable: true
type: array
requiredMetricsCheck:
$ref: '#/components/schemas/InframonitoringtypesRequiredMetricsCheck'
total:
type: integer
type:
$ref: '#/components/schemas/InframonitoringtypesResponseType'
warning:
$ref: '#/components/schemas/Querybuildertypesv5QueryWarnData'
required:
- type
- records
- total
- requiredMetricsCheck
- endTimeBeforeRetention
type: object
InframonitoringtypesPostableHosts:
properties:
end:
format: int64
type: integer
filter:
$ref: '#/components/schemas/InframonitoringtypesHostFilter'
groupBy:
items:
$ref: '#/components/schemas/Querybuildertypesv5GroupByKey'
nullable: true
type: array
limit:
type: integer
offset:
type: integer
orderBy:
$ref: '#/components/schemas/Querybuildertypesv5OrderBy'
start:
format: int64
type: integer
required:
- start
- end
- limit
type: object
InframonitoringtypesPostablePods:
properties:
end:
format: int64
type: integer
filter:
$ref: '#/components/schemas/Querybuildertypesv5Filter'
groupBy:
items:
$ref: '#/components/schemas/Querybuildertypesv5GroupByKey'
nullable: true
type: array
limit:
type: integer
offset:
type: integer
orderBy:
$ref: '#/components/schemas/Querybuildertypesv5OrderBy'
start:
format: int64
type: integer
type: object
InframonitoringtypesRequiredMetricsCheck:
properties:
missingMetrics:
items:
type: string
nullable: true
type: array
required:
- missingMetrics
type: object
InframonitoringtypesResponseType:
enum:
- list
- grouped_list
type: string
MetricsexplorertypesInspectMetricsRequest:
properties:
end:
@@ -10081,142 +9853,6 @@ paths:
summary: Health check
tags:
- health
/api/v2/infra_monitoring/hosts:
post:
deprecated: false
description: 'Returns a paginated list of hosts with key infrastructure metrics:
CPU usage (%), memory usage (%), I/O wait (%), disk usage (%), and 15-minute
load average. Each host includes its current status (active/inactive based
on metrics reported in the last 10 minutes) and metadata attributes (e.g.,
os.type). Supports filtering via a filter expression, filtering by host status,
custom groupBy to aggregate hosts by any attribute, ordering by any of the
five metrics, and pagination via offset/limit. The response type is ''list''
for the default host.name grouping or ''grouped_list'' for custom groupBy
keys. Also reports missing required metrics and whether the requested time
range falls before the data retention boundary.'
operationId: ListHosts
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/InframonitoringtypesPostableHosts'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/InframonitoringtypesHosts'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: List Hosts for Infra Monitoring
tags:
- inframonitoring
/api/v2/infra_monitoring/pods:
post:
deprecated: false
description: 'Returns a paginated list of Kubernetes pods with key metrics:
CPU usage, CPU request/limit utilization, memory working set, memory request/limit
utilization, current pod phase (pending/running/succeeded/failed), and pod
age (ms since start time). Each pod includes metadata attributes (namespace,
node, workload owner such as deployment/statefulset/daemonset/job/cronjob,
cluster). Supports filtering via a filter expression, custom groupBy to aggregate
pods by any attribute, ordering by any of the six metrics (cpu, cpu_request,
cpu_limit, memory, memory_request, memory_limit), and pagination via offset/limit.
The response type is ''list'' for the default k8s.pod.uid grouping (each row
is one pod with its current phase) or ''grouped_list'' for custom groupBy
keys (each row aggregates pods in the group with per-phase counts: pendingPodCount,
runningPodCount, succeededPodCount, failedPodCount derived from each pod''s
latest phase in the window). Also reports missing required metrics and whether
the requested time range falls before the data retention boundary.'
operationId: ListPods
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/InframonitoringtypesPostablePods'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/InframonitoringtypesPods'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: List Pods for Infra Monitoring
tags:
- inframonitoring
/api/v2/livez:
get:
deprecated: false

View File

@@ -15,6 +15,7 @@ import (
"github.com/SigNoz/signoz/ee/query-service/constants"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
@@ -71,6 +72,15 @@ func (ah *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
Route: "",
})
bodyJSONQuery := querybuilder.IsBodyJSONEnabled(ctx, ah.Signoz.Flagger)
featureSet = append(featureSet, &licensetypes.Feature{
Name: valuer.NewString(flagger.FeatureBodyJSONQuery.String()),
Active: bodyJSONQuery,
Usage: 0,
UsageLimit: -1,
Route: "",
})
if constants.IsDotMetricsEnabled {
for idx, feature := range featureSet {
if feature.Name == licensetypes.DotMetricsEnabled {

View File

@@ -105,6 +105,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
signoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
reader,
signoz.Flagger,
)
if err != nil {
return nil, err

View File

@@ -1,192 +0,0 @@
/**
* ! Do not edit manually
* * The file has been auto-generated using Orval for SigNoz
* * regenerate with 'yarn generate:api'
* SigNoz
*/
import { useMutation } from 'react-query';
import type {
MutationFunction,
UseMutationOptions,
UseMutationResult,
} from 'react-query';
import type {
InframonitoringtypesPostableHostsDTO,
InframonitoringtypesPostablePodsDTO,
ListHosts200,
ListPods200,
RenderErrorResponseDTO,
} from '../sigNoz.schemas';
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
import type { ErrorType, BodyType } from '../../../generatedAPIInstance';
/**
* Returns a paginated list of hosts with key infrastructure metrics: CPU usage (%), memory usage (%), I/O wait (%), disk usage (%), and 15-minute load average. Each host includes its current status (active/inactive based on metrics reported in the last 10 minutes) and metadata attributes (e.g., os.type). Supports filtering via a filter expression, filtering by host status, custom groupBy to aggregate hosts by any attribute, ordering by any of the five metrics, and pagination via offset/limit. The response type is 'list' for the default host.name grouping or 'grouped_list' for custom groupBy keys. Also reports missing required metrics and whether the requested time range falls before the data retention boundary.
* @summary List Hosts for Infra Monitoring
*/
export const listHosts = (
inframonitoringtypesPostableHostsDTO: BodyType<InframonitoringtypesPostableHostsDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<ListHosts200>({
url: `/api/v2/infra_monitoring/hosts`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: inframonitoringtypesPostableHostsDTO,
signal,
});
};
export const getListHostsMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listHosts>>,
TError,
{ data: BodyType<InframonitoringtypesPostableHostsDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof listHosts>>,
TError,
{ data: BodyType<InframonitoringtypesPostableHostsDTO> },
TContext
> => {
const mutationKey = ['listHosts'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof listHosts>>,
{ data: BodyType<InframonitoringtypesPostableHostsDTO> }
> = (props) => {
const { data } = props ?? {};
return listHosts(data);
};
return { mutationFn, ...mutationOptions };
};
export type ListHostsMutationResult = NonNullable<
Awaited<ReturnType<typeof listHosts>>
>;
export type ListHostsMutationBody =
BodyType<InframonitoringtypesPostableHostsDTO>;
export type ListHostsMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary List Hosts for Infra Monitoring
*/
export const useListHosts = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listHosts>>,
TError,
{ data: BodyType<InframonitoringtypesPostableHostsDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof listHosts>>,
TError,
{ data: BodyType<InframonitoringtypesPostableHostsDTO> },
TContext
> => {
const mutationOptions = getListHostsMutationOptions(options);
return useMutation(mutationOptions);
};
/**
* Returns a paginated list of Kubernetes pods with key metrics: CPU usage, CPU request/limit utilization, memory working set, memory request/limit utilization, current pod phase (pending/running/succeeded/failed), and pod age (ms since start time). Each pod includes metadata attributes (namespace, node, workload owner such as deployment/statefulset/daemonset/job/cronjob, cluster). Supports filtering via a filter expression, custom groupBy to aggregate pods by any attribute, ordering by any of the six metrics (cpu, cpu_request, cpu_limit, memory, memory_request, memory_limit), and pagination via offset/limit. The response type is 'list' for the default k8s.pod.uid grouping (each row is one pod with its current phase) or 'grouped_list' for custom groupBy keys (each row aggregates pods in the group with per-phase counts: pendingPodCount, runningPodCount, succeededPodCount, failedPodCount derived from each pod's latest phase in the window). Also reports missing required metrics and whether the requested time range falls before the data retention boundary.
* @summary List Pods for Infra Monitoring
*/
export const listPods = (
inframonitoringtypesPostablePodsDTO: BodyType<InframonitoringtypesPostablePodsDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<ListPods200>({
url: `/api/v2/infra_monitoring/pods`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: inframonitoringtypesPostablePodsDTO,
signal,
});
};
export const getListPodsMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listPods>>,
TError,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof listPods>>,
TError,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> },
TContext
> => {
const mutationKey = ['listPods'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof listPods>>,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> }
> = (props) => {
const { data } = props ?? {};
return listPods(data);
};
return { mutationFn, ...mutationOptions };
};
export type ListPodsMutationResult = NonNullable<
Awaited<ReturnType<typeof listPods>>
>;
export type ListPodsMutationBody =
BodyType<InframonitoringtypesPostablePodsDTO>;
export type ListPodsMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary List Pods for Infra Monitoring
*/
export const useListPods = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listPods>>,
TError,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof listPods>>,
TError,
{ data: BodyType<InframonitoringtypesPostablePodsDTO> },
TContext
> => {
const mutationOptions = getListPodsMutationOptions(options);
return useMutation(mutationOptions);
};

View File

@@ -3053,256 +3053,6 @@ export interface GlobaltypesTokenizerConfigDTO {
enabled?: boolean;
}
export interface InframonitoringtypesHostFilterDTO {
/**
* @type string
*/
expression?: string;
filterByStatus?: InframonitoringtypesHostStatusDTO;
}
/**
* @nullable
*/
export type InframonitoringtypesHostRecordDTOMeta = {
[key: string]: unknown;
} | null;
export interface InframonitoringtypesHostRecordDTO {
/**
* @type integer
*/
activeHostCount: number;
/**
* @type number
* @format double
*/
cpu: number;
/**
* @type number
* @format double
*/
diskUsage: number;
/**
* @type string
*/
hostName: string;
/**
* @type integer
*/
inactiveHostCount: number;
/**
* @type number
* @format double
*/
load15: number;
/**
* @type number
* @format double
*/
memory: number;
/**
* @type object
* @nullable true
*/
meta: InframonitoringtypesHostRecordDTOMeta;
status: InframonitoringtypesHostStatusDTO;
/**
* @type number
* @format double
*/
wait: number;
}
export enum InframonitoringtypesHostStatusDTO {
active = 'active',
inactive = 'inactive',
'' = '',
}
export interface InframonitoringtypesHostsDTO {
/**
* @type boolean
*/
endTimeBeforeRetention: boolean;
/**
* @type array
* @nullable true
*/
records: InframonitoringtypesHostRecordDTO[] | null;
requiredMetricsCheck: InframonitoringtypesRequiredMetricsCheckDTO;
/**
* @type integer
*/
total: number;
type: InframonitoringtypesResponseTypeDTO;
warning?: Querybuildertypesv5QueryWarnDataDTO;
}
export enum InframonitoringtypesPodPhaseDTO {
pending = 'pending',
running = 'running',
succeeded = 'succeeded',
failed = 'failed',
'' = '',
}
/**
* @nullable
*/
export type InframonitoringtypesPodRecordDTOMeta = {
[key: string]: unknown;
} | null;
export interface InframonitoringtypesPodRecordDTO {
/**
* @type integer
*/
failedPodCount: number;
/**
* @type object
* @nullable true
*/
meta: InframonitoringtypesPodRecordDTOMeta;
/**
* @type integer
*/
pendingPodCount: number;
/**
* @type integer
* @format int64
*/
podAge: number;
/**
* @type number
* @format double
*/
podCPU: number;
/**
* @type number
* @format double
*/
podCPULimit: number;
/**
* @type number
* @format double
*/
podCPURequest: number;
/**
* @type number
* @format double
*/
podMemory: number;
/**
* @type number
* @format double
*/
podMemoryLimit: number;
/**
* @type number
* @format double
*/
podMemoryRequest: number;
podPhase: InframonitoringtypesPodPhaseDTO;
/**
* @type string
*/
podUID: string;
/**
* @type integer
*/
runningPodCount: number;
/**
* @type integer
*/
succeededPodCount: number;
}
export interface InframonitoringtypesPodsDTO {
/**
* @type boolean
*/
endTimeBeforeRetention: boolean;
/**
* @type array
* @nullable true
*/
records: InframonitoringtypesPodRecordDTO[] | null;
requiredMetricsCheck: InframonitoringtypesRequiredMetricsCheckDTO;
/**
* @type integer
*/
total: number;
type: InframonitoringtypesResponseTypeDTO;
warning?: Querybuildertypesv5QueryWarnDataDTO;
}
export interface InframonitoringtypesPostableHostsDTO {
/**
* @type integer
* @format int64
*/
end: number;
filter?: InframonitoringtypesHostFilterDTO;
/**
* @type array
* @nullable true
*/
groupBy?: Querybuildertypesv5GroupByKeyDTO[] | null;
/**
* @type integer
*/
limit: number;
/**
* @type integer
*/
offset?: number;
orderBy?: Querybuildertypesv5OrderByDTO;
/**
* @type integer
* @format int64
*/
start: number;
}
export interface InframonitoringtypesPostablePodsDTO {
/**
* @type integer
* @format int64
*/
end?: number;
filter?: Querybuildertypesv5FilterDTO;
/**
* @type array
* @nullable true
*/
groupBy?: Querybuildertypesv5GroupByKeyDTO[] | null;
/**
* @type integer
*/
limit?: number;
/**
* @type integer
*/
offset?: number;
orderBy?: Querybuildertypesv5OrderByDTO;
/**
* @type integer
* @format int64
*/
start?: number;
}
export interface InframonitoringtypesRequiredMetricsCheckDTO {
/**
* @type array
* @nullable true
*/
missingMetrics: string[] | null;
}
export enum InframonitoringtypesResponseTypeDTO {
list = 'list',
grouped_list = 'grouped_list',
}
export interface MetricsexplorertypesInspectMetricsRequestDTO {
/**
* @type integer
@@ -6888,22 +6638,6 @@ export type Healthz503 = {
status: string;
};
export type ListHosts200 = {
data: InframonitoringtypesHostsDTO;
/**
* @type string
*/
status: string;
};
export type ListPods200 = {
data: InframonitoringtypesPodsDTO;
/**
* @type string
*/
status: string;
};
export type Livez200 = {
data: FactoryResponseDTO;
/**

View File

@@ -1,52 +0,0 @@
package signozapiserver
import (
"net/http"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/gorilla/mux"
)
func (provider *provider) addInfraMonitoringRoutes(router *mux.Router) error {
if err := router.Handle("/api/v2/infra_monitoring/hosts", handler.New(
provider.authZ.ViewAccess(provider.infraMonitoringHandler.ListHosts),
handler.OpenAPIDef{
ID: "ListHosts",
Tags: []string{"inframonitoring"},
Summary: "List Hosts for Infra Monitoring",
Description: "Returns a paginated list of hosts with key infrastructure metrics: CPU usage (%), memory usage (%), I/O wait (%), disk usage (%), and 15-minute load average. Each host includes its current status (active/inactive based on metrics reported in the last 10 minutes) and metadata attributes (e.g., os.type). Supports filtering via a filter expression, filtering by host status, custom groupBy to aggregate hosts by any attribute, ordering by any of the five metrics, and pagination via offset/limit. The response type is 'list' for the default host.name grouping or 'grouped_list' for custom groupBy keys. Also reports missing required metrics and whether the requested time range falls before the data retention boundary.",
Request: new(inframonitoringtypes.PostableHosts),
RequestContentType: "application/json",
Response: new(inframonitoringtypes.Hosts),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/infra_monitoring/pods", handler.New(
provider.authZ.ViewAccess(provider.infraMonitoringHandler.ListPods),
handler.OpenAPIDef{
ID: "ListPods",
Tags: []string{"inframonitoring"},
Summary: "List Pods for Infra Monitoring",
Description: "Returns a paginated list of Kubernetes pods with key metrics: CPU usage, CPU request/limit utilization, memory working set, memory request/limit utilization, current pod phase (pending/running/succeeded/failed), and pod age (ms since start time). Each pod includes metadata attributes (namespace, node, workload owner such as deployment/statefulset/daemonset/job/cronjob, cluster). Supports filtering via a filter expression, custom groupBy to aggregate pods by any attribute, ordering by any of the six metrics (cpu, cpu_request, cpu_limit, memory, memory_request, memory_limit), and pagination via offset/limit. The response type is 'list' for the default k8s.pod.uid grouping (each row is one pod with its current phase) or 'grouped_list' for custom groupBy keys (each row aggregates pods in the group with per-phase counts: pendingPodCount, runningPodCount, succeededPodCount, failedPodCount derived from each pod's latest phase in the window). Also reports missing required metrics and whether the requested time range falls before the data retention boundary.",
Request: new(inframonitoringtypes.PostablePods),
RequestContentType: "application/json",
Response: new(inframonitoringtypes.Pods),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -17,7 +17,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/promote"
@@ -50,7 +49,6 @@ type provider struct {
dashboardModule dashboard.Module
dashboardHandler dashboard.Handler
metricsExplorerHandler metricsexplorer.Handler
infraMonitoringHandler inframonitoring.Handler
gatewayHandler gateway.Handler
fieldsHandler fields.Handler
authzHandler authz.Handler
@@ -79,7 +77,6 @@ func NewFactory(
dashboardModule dashboard.Module,
dashboardHandler dashboard.Handler,
metricsExplorerHandler metricsexplorer.Handler,
infraMonitoringHandler inframonitoring.Handler,
gatewayHandler gateway.Handler,
fieldsHandler fields.Handler,
authzHandler authz.Handler,
@@ -111,7 +108,6 @@ func NewFactory(
dashboardModule,
dashboardHandler,
metricsExplorerHandler,
infraMonitoringHandler,
gatewayHandler,
fieldsHandler,
authzHandler,
@@ -145,7 +141,6 @@ func newProvider(
dashboardModule dashboard.Module,
dashboardHandler dashboard.Handler,
metricsExplorerHandler metricsexplorer.Handler,
infraMonitoringHandler inframonitoring.Handler,
gatewayHandler gateway.Handler,
fieldsHandler fields.Handler,
authzHandler authz.Handler,
@@ -177,7 +172,6 @@ func newProvider(
dashboardModule: dashboardModule,
dashboardHandler: dashboardHandler,
metricsExplorerHandler: metricsExplorerHandler,
infraMonitoringHandler: infraMonitoringHandler,
gatewayHandler: gatewayHandler,
fieldsHandler: fieldsHandler,
authzHandler: authzHandler,
@@ -246,10 +240,6 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addInfraMonitoringRoutes(router); err != nil {
return err
}
if err := provider.addGatewayRoutes(router); err != nil {
return err
}

View File

@@ -0,0 +1,52 @@
// Package flaggertest provides helpers for creating Flagger instances in tests.
package flaggertest
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/flagger/configflagger"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
)
// New returns a Flagger with all flags at their registry defaults (all disabled).
// Use this in tests that do not need any feature flag enabled.
func New(t *testing.T) flagger.Flagger {
t.Helper()
registry := flagger.MustNewRegistry()
fl, err := flagger.New(
context.Background(),
instrumentationtest.New().ToProviderSettings(),
flagger.Config{},
registry,
configflagger.NewFactory(registry),
)
if err != nil {
t.Fatalf("flaggertest.New: %v", err)
}
return fl
}
// WithBodyJSON returns a Flagger with body_json_enabled set to the given value.
func WithBodyJSON(t *testing.T, enabled bool) flagger.Flagger {
t.Helper()
registry := flagger.MustNewRegistry()
cfg := flagger.Config{}
if enabled {
cfg.Config.Boolean = map[string]bool{
flagger.FeatureBodyJSONQuery.String(): true,
}
}
fl, err := flagger.New(
context.Background(),
instrumentationtest.New().ToProviderSettings(),
cfg,
registry,
configflagger.NewFactory(registry),
)
if err != nil {
t.Fatalf("flaggertest.WithBodyJSON: %v", err)
}
return fl
}

View File

@@ -8,6 +8,7 @@ var (
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
FeatureBodyJSONQuery = featuretypes.MustNewName("body_json_enabled")
)
func MustNewRegistry() featuretypes.Registry {
@@ -52,6 +53,14 @@ func MustNewRegistry() featuretypes.Registry {
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureBodyJSONQuery,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether body JSON querying is enabled",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
)
if err != nil {
panic(err)

View File

@@ -1,33 +0,0 @@
package inframonitoring
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
type Config struct {
TelemetryStore TelemetryStoreConfig `mapstructure:"telemetrystore"`
}
type TelemetryStoreConfig struct {
Threads int `mapstructure:"threads"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("inframonitoring"), newConfig)
}
func newConfig() factory.Config {
return Config{
TelemetryStore: TelemetryStoreConfig{
Threads: 8,
},
}
}
func (c Config) Validate() error {
if c.TelemetryStore.Threads <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "inframonitoring.telemetrystore.threads must be positive, got %d", c.TelemetryStore.Threads)
}
return nil
}

View File

@@ -1,71 +0,0 @@
package implinframonitoring
import (
"net/http"
"github.com/SigNoz/signoz/pkg/http/binding"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type handler struct {
module inframonitoring.Module
}
// NewHandler returns an inframonitoring.Handler implementation.
func NewHandler(m inframonitoring.Module) inframonitoring.Handler {
return &handler{
module: m,
}
}
func (h *handler) ListHosts(rw http.ResponseWriter, req *http.Request) {
claims, err := authtypes.ClaimsFromContext(req.Context())
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
var parsedReq inframonitoringtypes.PostableHosts
if err := binding.JSON.BindBody(req.Body, &parsedReq); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.ListHosts(req.Context(), orgID, &parsedReq)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}
func (h *handler) ListPods(rw http.ResponseWriter, req *http.Request) {
claims, err := authtypes.ClaimsFromContext(req.Context())
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
var parsedReq inframonitoringtypes.PostablePods
if err := binding.JSON.BindBody(req.Body, &parsedReq); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.ListPods(req.Context(), orgID, &parsedReq)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}

View File

@@ -1,573 +0,0 @@
package implinframonitoring
import (
"context"
"fmt"
"sort"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// quoteIdentifier wraps s in backticks for use as a ClickHouse identifier,
// escaping any embedded backticks by doubling them.
func quoteIdentifier(s string) string {
return fmt.Sprintf("`%s`", strings.ReplaceAll(s, "`", "``"))
}
func isKeyInGroupByAttrs(groupByAttrs []qbtypes.GroupByKey, key string) bool {
for _, groupBy := range groupByAttrs {
if groupBy.Name == key {
return true
}
}
return false
}
func mergeFilterExpressions(queryFilterExpr, reqFilterExpr string) string {
queryFilterExpr = strings.TrimSpace(queryFilterExpr)
reqFilterExpr = strings.TrimSpace(reqFilterExpr)
if queryFilterExpr == "" {
return reqFilterExpr
}
if reqFilterExpr == "" {
return queryFilterExpr
}
return fmt.Sprintf("(%s) AND (%s)", queryFilterExpr, reqFilterExpr)
}
// compositeKeyFromList builds a composite key by joining the given parts
// with a null byte separator. This is the canonical way to construct
// composite keys for group identification across the infra monitoring module.
func compositeKeyFromList(parts []string) string {
return strings.Join(parts, "\x00")
}
// compositeKeyFromLabels builds a composite key from a label map by extracting
// the value for each groupBy key in order and joining them via compositeKeyFromList.
func compositeKeyFromLabels(labels map[string]string, groupBy []qbtypes.GroupByKey) string {
parts := make([]string, len(groupBy))
for i, key := range groupBy {
parts[i] = labels[key.Name]
}
return compositeKeyFromList(parts)
}
// parseAndSortGroups extracts group label maps from a ScalarData response and
// sorts them by the ranking query's aggregation value.
func parseAndSortGroups(
resp *qbtypes.QueryRangeResponse,
rankingQueryName string,
groupBy []qbtypes.GroupByKey,
direction qbtypes.OrderDirection,
) []rankedGroup {
if resp == nil || len(resp.Data.Results) == 0 {
return nil
}
// Find the ScalarData that contains the ranking column.
var sd *qbtypes.ScalarData
for _, r := range resp.Data.Results {
candidate, ok := r.(*qbtypes.ScalarData)
if !ok || candidate == nil {
continue
}
for _, col := range candidate.Columns {
if col.Type == qbtypes.ColumnTypeAggregation && col.QueryName == rankingQueryName {
sd = candidate
break
}
}
if sd != nil {
break
}
}
if sd == nil || len(sd.Data) == 0 {
return nil
}
groupColIndices := make(map[string]int)
rankingColIdx := -1
for i, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupColIndices[col.Name] = i
}
if col.Type == qbtypes.ColumnTypeAggregation && col.QueryName == rankingQueryName {
rankingColIdx = i
}
}
if rankingColIdx == -1 {
return nil
}
groups := make([]rankedGroup, 0, len(sd.Data))
for _, row := range sd.Data {
labels := make(map[string]string, len(groupBy))
for _, key := range groupBy {
if idx, ok := groupColIndices[key.Name]; ok && idx < len(row) {
labels[key.Name] = fmt.Sprintf("%v", row[idx])
}
}
var value float64
if rankingColIdx < len(row) {
if v, ok := row[rankingColIdx].(float64); ok {
value = v
}
}
groups = append(groups, rankedGroup{
labels: labels,
value: value,
compositeKey: compositeKeyFromLabels(labels, groupBy),
})
}
sort.Slice(groups, func(i, j int) bool {
if groups[i].value != groups[j].value {
if direction == qbtypes.OrderDirectionAsc {
return groups[i].value < groups[j].value
}
return groups[i].value > groups[j].value
}
return groups[i].compositeKey < groups[j].compositeKey
})
return groups
}
// paginateWithBackfill returns the page of groups for [offset, offset+limit).
// The virtual sorted list is: metric-ranked groups first, then metadata-only
// groups (those in metadataMap but not in metric results) sorted alphabetically.
func paginateWithBackfill(
metricGroups []rankedGroup,
metadataMap map[string]map[string]string,
groupBy []qbtypes.GroupByKey,
offset, limit int,
) []map[string]string {
metricKeySet := make(map[string]bool, len(metricGroups))
for _, g := range metricGroups {
metricKeySet[g.compositeKey] = true
}
metadataOnlyKeys := make([]string, 0)
for compositeKey := range metadataMap {
if !metricKeySet[compositeKey] {
metadataOnlyKeys = append(metadataOnlyKeys, compositeKey)
}
}
sort.Strings(metadataOnlyKeys)
totalMetric := len(metricGroups)
totalAll := totalMetric + len(metadataOnlyKeys)
end := offset + limit
if end > totalAll {
end = totalAll
}
if offset >= totalAll {
return nil
}
pageGroups := make([]map[string]string, 0, end-offset)
for i := offset; i < end; i++ {
if i < totalMetric {
pageGroups = append(pageGroups, metricGroups[i].labels)
} else {
compositeKey := metadataOnlyKeys[i-totalMetric]
attrs := metadataMap[compositeKey]
labels := make(map[string]string, len(groupBy))
for _, key := range groupBy {
labels[key.Name] = attrs[key.Name]
}
pageGroups = append(pageGroups, labels)
}
}
return pageGroups
}
// buildPageGroupsFilterExpr builds a filter expression that restricts results
// to the given page of groups via IN clauses.
// Returns e.g. "host.name IN ('h1','h2') AND os.type IN ('linux','windows')".
func buildPageGroupsFilterExpr(pageGroups []map[string]string) string {
groupValues := make(map[string][]string)
for _, labels := range pageGroups {
for k, v := range labels {
groupValues[k] = append(groupValues[k], v)
}
}
inClauses := make([]string, 0, len(groupValues))
for key, values := range groupValues {
quoted := make([]string, len(values))
for i, v := range values {
quoted[i] = fmt.Sprintf("'%s'", v)
}
inClauses = append(inClauses, fmt.Sprintf("%s IN (%s)", key, strings.Join(quoted, ", ")))
}
return strings.Join(inClauses, " AND ")
}
// buildFullQueryRequest creates a QueryRangeRequest for all metrics,
// restricted to the given page of groups via an IN filter.
// Accepts primitive fields so it can be reused across different v2 APIs
// (hosts, pods, etc.).
func buildFullQueryRequest(
start int64,
end int64,
filterExpr string,
groupBy []qbtypes.GroupByKey,
pageGroups []map[string]string,
tableListQuery *qbtypes.QueryRangeRequest,
) *qbtypes.QueryRangeRequest {
inFilterExpr := buildPageGroupsFilterExpr(pageGroups)
fullReq := &qbtypes.QueryRangeRequest{
Start: uint64(start),
End: uint64(end),
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0, len(tableListQuery.CompositeQuery.Queries)),
},
}
for _, envelope := range tableListQuery.CompositeQuery.Queries {
copied := envelope
if copied.Type == qbtypes.QueryTypeBuilder {
existingExpr := ""
if f := copied.GetFilter(); f != nil {
existingExpr = f.Expression
}
merged := mergeFilterExpressions(existingExpr, filterExpr)
merged = mergeFilterExpressions(merged, inFilterExpr)
copied.SetFilter(&qbtypes.Filter{Expression: merged})
copied.SetGroupBy(groupBy)
}
fullReq.CompositeQuery.Queries = append(fullReq.CompositeQuery.Queries, copied)
}
return fullReq
}
// parseFullQueryResponse extracts per-group metric values from the full
// composite query response. Returns compositeKey -> (queryName -> value).
// Each enabled query/formula produces its own ScalarData entry in Results,
// so we iterate over all of them and merge metrics per composite key.
func parseFullQueryResponse(
resp *qbtypes.QueryRangeResponse,
groupBy []qbtypes.GroupByKey,
) map[string]map[string]float64 {
result := make(map[string]map[string]float64)
if resp == nil || len(resp.Data.Results) == 0 {
return result
}
for _, r := range resp.Data.Results {
sd, ok := r.(*qbtypes.ScalarData)
if !ok || sd == nil {
continue
}
groupColIndices := make(map[string]int)
aggCols := make(map[int]string) // col index -> query name
for i, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupColIndices[col.Name] = i
}
if col.Type == qbtypes.ColumnTypeAggregation {
aggCols[i] = col.QueryName
}
}
for _, row := range sd.Data {
labels := make(map[string]string, len(groupBy))
for _, key := range groupBy {
if idx, ok := groupColIndices[key.Name]; ok && idx < len(row) {
labels[key.Name] = fmt.Sprintf("%v", row[idx])
}
}
compositeKey := compositeKeyFromLabels(labels, groupBy)
if result[compositeKey] == nil {
result[compositeKey] = make(map[string]float64)
}
for idx, queryName := range aggCols {
if idx < len(row) {
if v, ok := row[idx].(float64); ok {
result[compositeKey][queryName] = v
}
}
}
}
}
return result
}
// buildSamplesTblFingerprintSubQuery returns a SelectBuilder that selects distinct fingerprints
// from the samples table for the given metric names andtime range.
func (m *module) buildSamplesTblFingerprintSubQuery(metricNames []string, startMs, endMs int64) *sqlbuilder.SelectBuilder {
samplesTableName := telemetrymetrics.WhichSamplesTableToUse(
uint64(startMs), uint64(endMs),
metrictypes.UnspecifiedType,
metrictypes.TimeAggregationUnspecified,
nil,
)
localSamplesTable := strings.TrimPrefix(samplesTableName, "distributed_")
fpSB := sqlbuilder.NewSelectBuilder()
fpSB.Select("DISTINCT fingerprint")
fpSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localSamplesTable))
fpSB.Where(
fpSB.In("metric_name", sqlbuilder.List(metricNames)),
fpSB.GE("unix_milli", startMs),
fpSB.L("unix_milli", endMs),
)
return fpSB
}
func (m *module) buildFilterClause(ctx context.Context, filter *qbtypes.Filter, startMillis, endMillis int64) (*sqlbuilder.WhereClause, error) {
expression := ""
if filter != nil {
expression = strings.TrimSpace(filter.Expression)
}
if expression == "" {
return sqlbuilder.NewWhereClause(), nil
}
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(expression)
for idx := range whereClauseSelectors {
whereClauseSelectors[idx].Signal = telemetrytypes.SignalMetrics
whereClauseSelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
}
keys, _, err := m.telemetryMetadataStore.GetKeysMulti(ctx, whereClauseSelectors)
if err != nil {
return nil, err
}
opts := querybuilder.FilterExprVisitorOpts{
Context: ctx,
Logger: m.logger,
FieldMapper: m.fieldMapper,
ConditionBuilder: m.condBuilder,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "metric_name", FieldContext: telemetrytypes.FieldContextMetric},
FieldKeys: keys,
StartNs: querybuilder.ToNanoSecs(uint64(startMillis)),
EndNs: querybuilder.ToNanoSecs(uint64(endMillis)),
}
whereClause, err := querybuilder.PrepareWhereClause(expression, opts)
if err != nil {
return nil, err
}
if whereClause == nil || whereClause.WhereClause == nil {
return sqlbuilder.NewWhereClause(), nil
}
return whereClause.WhereClause, nil
}
// NOTE: this method is not specific to infra monitoring — it queries attributes_metadata generically.
// Consider moving to telemetryMetaStore when a second use case emerges.
//
// getMetricsExistenceAndEarliestTime checks which of the given metric names have been
// reported. It returns a list of missing metrics (those not found or with zero count)
// and the earliest first-reported timestamp across all present metrics.
// When all metrics are missing, minFirstReportedUnixMilli is 0.
// TODO(nikhilmantri0902, srikanthccv): This method was designed this way because querier errors if any of the metrics
// in the querier list was never sent, the QueryRange call throws not found error. Modify this method, if QueryRange
// behaviour changes towards this.
func (m *module) getMetricsExistenceAndEarliestTime(ctx context.Context, metricNames []string) ([]string, uint64, error) {
if len(metricNames) == 0 {
return nil, 0, nil
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("metric_name", "count(*) AS cnt", "min(first_reported_unix_milli) AS min_first_reported")
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName))
sb.Where(sb.In("metric_name", sqlbuilder.List(metricNames)))
sb.GroupBy("metric_name")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := m.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
type metricInfo struct {
count uint64
minFirstReported uint64
}
found := make(map[string]metricInfo, len(metricNames))
for rows.Next() {
var name string
var cnt, minFR uint64
if err := rows.Scan(&name, &cnt, &minFR); err != nil {
return nil, 0, err
}
found[name] = metricInfo{count: cnt, minFirstReported: minFR}
}
if err := rows.Err(); err != nil {
return nil, 0, err
}
var missingMetrics []string
var globalMinFirstReported uint64
for _, name := range metricNames {
info, ok := found[name]
if !ok || info.count == 0 {
missingMetrics = append(missingMetrics, name)
continue
}
if globalMinFirstReported == 0 || info.minFirstReported < globalMinFirstReported {
globalMinFirstReported = info.minFirstReported
}
}
return missingMetrics, globalMinFirstReported, nil
}
// getMetadata fetches the latest values of additionalCols for each unique combination of groupBy keys,
// within the given time range and metric names. It uses argMax(tuple(...), unix_milli) to ensure
// we always pick attribute values from the latest timestamp for each group.
// The returned map has a composite key of groupBy column values joined by "\x00" (null byte),
// mapping to a flat map of attr_name -> attr_value (includes both groupBy and additional cols).
func (m *module) getMetadata(
ctx context.Context,
metricNames []string,
groupBy []qbtypes.GroupByKey,
additionalCols []string,
filter *qbtypes.Filter,
startMs, endMs int64,
) (map[string]map[string]string, error) {
if len(metricNames) == 0 {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "metricNames must not be empty")
}
if len(groupBy) == 0 {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "groupBy must not be empty")
}
// Pick the optimal timeseries table based on time range; also get adjusted start.
adjustedStart, adjustedEnd, distributedTableName, _ := telemetrymetrics.WhichTSTableToUse(
uint64(startMs), uint64(endMs), nil,
)
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, startMs, endMs)
// Flatten groupBy keys to string names for SQL expressions and result scanning.
groupByCols := make([]string, len(groupBy))
for i, key := range groupBy {
groupByCols[i] = key.Name
}
allCols := append(groupByCols, additionalCols...)
// --- Build inner query ---
innerSB := sqlbuilder.NewSelectBuilder()
// Inner SELECT columns: JSONExtractString for each groupBy col + argMax(tuple(...)) for additional cols
innerSelectCols := make([]string, 0, len(groupByCols)+1)
for _, col := range groupByCols {
innerSelectCols = append(innerSelectCols,
fmt.Sprintf("JSONExtractString(labels, %s) AS %s", innerSB.Var(col), quoteIdentifier(col)),
)
}
// Build the argMax(tuple(...), unix_milli) expression for all additional cols
if len(additionalCols) > 0 {
tupleArgs := make([]string, 0, len(additionalCols))
for _, col := range additionalCols {
tupleArgs = append(tupleArgs, fmt.Sprintf("JSONExtractString(labels, %s)", innerSB.Var(col)))
}
innerSelectCols = append(innerSelectCols,
fmt.Sprintf("argMax(tuple(%s), unix_milli) AS latest_attrs", strings.Join(tupleArgs, ", ")),
)
}
innerSB.Select(innerSelectCols...)
innerSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTableName))
innerSB.Where(
innerSB.In("metric_name", sqlbuilder.List(metricNames)),
innerSB.GE("unix_milli", adjustedStart),
innerSB.L("unix_milli", adjustedEnd),
fmt.Sprintf("fingerprint IN (%s)", innerSB.Var(fpSB)),
)
// Apply optional filter expression
if filter != nil && strings.TrimSpace(filter.Expression) != "" {
filterClause, err := m.buildFilterClause(ctx, filter, startMs, endMs)
if err != nil {
return nil, err
}
if filterClause != nil {
innerSB.AddWhereClause(filterClause)
}
}
groupByAliases := make([]string, 0, len(groupByCols))
for _, col := range groupByCols {
groupByAliases = append(groupByAliases, quoteIdentifier(col))
}
innerSB.GroupBy(groupByAliases...)
innerQuery, innerArgs := innerSB.BuildWithFlavor(sqlbuilder.ClickHouse)
// --- Build outer query ---
// Outer SELECT columns: groupBy cols directly + tupleElement(latest_attrs, N) for each additionalCol
outerSelectCols := make([]string, 0, len(allCols))
for _, col := range groupByCols {
outerSelectCols = append(outerSelectCols, quoteIdentifier(col))
}
for i, col := range additionalCols {
outerSelectCols = append(outerSelectCols,
fmt.Sprintf("tupleElement(latest_attrs, %d) AS %s", i+1, quoteIdentifier(col)),
)
}
outerSB := sqlbuilder.NewSelectBuilder()
outerSB.Select(outerSelectCols...)
outerSB.From(fmt.Sprintf("(%s)", innerQuery))
outerQuery, _ := outerSB.BuildWithFlavor(sqlbuilder.ClickHouse)
// All ? params are in innerArgs; outer query introduces none of its own.
rows, err := m.telemetryStore.ClickhouseDB().Query(ctx, outerQuery, innerArgs...)
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string]map[string]string)
for rows.Next() {
row := make([]string, len(allCols))
scanPtrs := make([]any, len(row))
for i := range row {
scanPtrs[i] = &row[i]
}
if err := rows.Scan(scanPtrs...); err != nil {
return nil, err
}
compositeKey := compositeKeyFromList(row[:len(groupByCols)])
attrMap := make(map[string]string, len(allCols))
for i, col := range allCols {
attrMap[col] = row[i]
}
result[compositeKey] = attrMap
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}

View File

@@ -1,283 +0,0 @@
package implinframonitoring
import (
"testing"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
func groupByKey(name string) qbtypes.GroupByKey {
return qbtypes.GroupByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
}
}
func TestIsKeyInGroupByAttrs(t *testing.T) {
tests := []struct {
name string
groupByAttrs []qbtypes.GroupByKey
key string
expectedFound bool
}{
{
name: "key present in single-element list",
groupByAttrs: []qbtypes.GroupByKey{groupByKey("host.name")},
key: "host.name",
expectedFound: true,
},
{
name: "key present in multi-element list",
groupByAttrs: []qbtypes.GroupByKey{
groupByKey("host.name"),
groupByKey("os.type"),
groupByKey("k8s.cluster.name"),
},
key: "os.type",
expectedFound: true,
},
{
name: "key at last position",
groupByAttrs: []qbtypes.GroupByKey{
groupByKey("host.name"),
groupByKey("os.type"),
},
key: "os.type",
expectedFound: true,
},
{
name: "key not in list",
groupByAttrs: []qbtypes.GroupByKey{groupByKey("host.name")},
key: "os.type",
expectedFound: false,
},
{
name: "empty group by list",
groupByAttrs: []qbtypes.GroupByKey{},
key: "host.name",
expectedFound: false,
},
{
name: "nil group by list",
groupByAttrs: nil,
key: "host.name",
expectedFound: false,
},
{
name: "empty key string",
groupByAttrs: []qbtypes.GroupByKey{groupByKey("host.name")},
key: "",
expectedFound: false,
},
{
name: "empty key matches empty-named group by key",
groupByAttrs: []qbtypes.GroupByKey{groupByKey("")},
key: "",
expectedFound: true,
},
{
name: "partial match does not count",
groupByAttrs: []qbtypes.GroupByKey{
groupByKey("host"),
},
key: "host.name",
expectedFound: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := isKeyInGroupByAttrs(tt.groupByAttrs, tt.key)
if got != tt.expectedFound {
t.Errorf("isKeyInGroupByAttrs(%v, %q) = %v, want %v",
tt.groupByAttrs, tt.key, got, tt.expectedFound)
}
})
}
}
func TestMergeFilterExpressions(t *testing.T) {
tests := []struct {
name string
queryFilterExpr string
reqFilterExpr string
expected string
}{
{
name: "both non-empty",
queryFilterExpr: "cpu > 50",
reqFilterExpr: "host.name = 'web-1'",
expected: "(cpu > 50) AND (host.name = 'web-1')",
},
{
name: "query empty, req non-empty",
queryFilterExpr: "",
reqFilterExpr: "host.name = 'web-1'",
expected: "host.name = 'web-1'",
},
{
name: "query non-empty, req empty",
queryFilterExpr: "cpu > 50",
reqFilterExpr: "",
expected: "cpu > 50",
},
{
name: "both empty",
queryFilterExpr: "",
reqFilterExpr: "",
expected: "",
},
{
name: "whitespace-only query treated as empty",
queryFilterExpr: " ",
reqFilterExpr: "host.name = 'web-1'",
expected: "host.name = 'web-1'",
},
{
name: "whitespace-only req treated as empty",
queryFilterExpr: "cpu > 50",
reqFilterExpr: " ",
expected: "cpu > 50",
},
{
name: "both whitespace-only",
queryFilterExpr: " ",
reqFilterExpr: " ",
expected: "",
},
{
name: "leading/trailing whitespace trimmed before merge",
queryFilterExpr: " cpu > 50 ",
reqFilterExpr: " mem < 80 ",
expected: "(cpu > 50) AND (mem < 80)",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := mergeFilterExpressions(tt.queryFilterExpr, tt.reqFilterExpr)
if got != tt.expected {
t.Errorf("mergeFilterExpressions(%q, %q) = %q, want %q",
tt.queryFilterExpr, tt.reqFilterExpr, got, tt.expected)
}
})
}
}
func TestCompositeKeyFromList(t *testing.T) {
tests := []struct {
name string
parts []string
expected string
}{
{
name: "single part",
parts: []string{"web-1"},
expected: "web-1",
},
{
name: "multiple parts joined with null separator",
parts: []string{"web-1", "linux", "us-east"},
expected: "web-1\x00linux\x00us-east",
},
{
name: "empty slice returns empty string",
parts: []string{},
expected: "",
},
{
name: "nil slice returns empty string",
parts: nil,
expected: "",
},
{
name: "parts with empty strings",
parts: []string{"web-1", "", "us-east"},
expected: "web-1\x00\x00us-east",
},
{
name: "all empty strings",
parts: []string{"", ""},
expected: "\x00",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := compositeKeyFromList(tt.parts)
if got != tt.expected {
t.Errorf("compositeKeyFromList(%v) = %q, want %q",
tt.parts, got, tt.expected)
}
})
}
}
func TestCompositeKeyFromLabels(t *testing.T) {
tests := []struct {
name string
labels map[string]string
groupBy []qbtypes.GroupByKey
expected string
}{
{
name: "single group-by key",
labels: map[string]string{"host.name": "web-1"},
groupBy: []qbtypes.GroupByKey{groupByKey("host.name")},
expected: "web-1",
},
{
name: "multiple group-by keys joined with null separator",
labels: map[string]string{
"host.name": "web-1",
"os.type": "linux",
},
groupBy: []qbtypes.GroupByKey{groupByKey("host.name"), groupByKey("os.type")},
expected: "web-1\x00linux",
},
{
name: "missing label yields empty segment",
labels: map[string]string{"host.name": "web-1"},
groupBy: []qbtypes.GroupByKey{groupByKey("host.name"), groupByKey("os.type")},
expected: "web-1\x00",
},
{
name: "empty labels map",
labels: map[string]string{},
groupBy: []qbtypes.GroupByKey{groupByKey("host.name")},
expected: "",
},
{
name: "empty group-by slice",
labels: map[string]string{"host.name": "web-1"},
groupBy: []qbtypes.GroupByKey{},
expected: "",
},
{
name: "nil labels map",
labels: nil,
groupBy: []qbtypes.GroupByKey{groupByKey("host.name")},
expected: "",
},
{
name: "order matches group-by order, not map iteration order",
labels: map[string]string{
"z": "last",
"a": "first",
"m": "middle",
},
groupBy: []qbtypes.GroupByKey{groupByKey("a"), groupByKey("m"), groupByKey("z")},
expected: "first\x00middle\x00last",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := compositeKeyFromLabels(tt.labels, tt.groupBy)
if got != tt.expected {
t.Errorf("compositeKeyFromLabels(%v, %v) = %q, want %q",
tt.labels, tt.groupBy, got, tt.expected)
}
})
}
}

View File

@@ -1,354 +0,0 @@
package implinframonitoring
import (
"context"
"fmt"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
)
// getPerGroupHostStatusCounts computes the number of active and inactive hosts per group
// for the current page. It queries the timeseries table using the provided filter
// expression (which includes user filter + status filter + page groups IN clause).
// Uses GLOBAL IN with the active-hosts subquery inside uniqExactIf for active count,
// and a simple uniqExactIf for total count. Inactive = total - active (computed in Go).
func (m *module) getPerGroupHostStatusCounts(
ctx context.Context,
req *inframonitoringtypes.PostableHosts,
metricNames []string,
pageGroups []map[string]string,
sinceUnixMilli int64,
) (map[string]groupHostStatusCounts, error) {
// Build the full filter expression from req (user filter + status filter) and page groups.
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
}
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
adjustedStart, adjustedEnd, distributedTimeSeriesTableName, _ := telemetrymetrics.WhichTSTableToUse(
uint64(req.Start), uint64(req.End), nil,
)
hostNameExpr := fmt.Sprintf("JSONExtractString(labels, '%s')", hostNameAttrKey)
sb := sqlbuilder.NewSelectBuilder()
selectCols := make([]string, 0, len(req.GroupBy)+2)
for _, key := range req.GroupBy {
selectCols = append(selectCols,
fmt.Sprintf("JSONExtractString(labels, %s) AS %s", sb.Var(key.Name), quoteIdentifier(key.Name)),
)
}
activeHostsSQ := m.getActiveHostsQuery(metricNames, hostNameAttrKey, sinceUnixMilli)
selectCols = append(selectCols,
fmt.Sprintf("uniqExactIf(%s, %s GLOBAL IN (%s)) AS active_host_count", hostNameExpr, hostNameExpr, sb.Var(activeHostsSQ)),
fmt.Sprintf("uniqExactIf(%s, %s != '') AS total_host_count", hostNameExpr, hostNameExpr),
)
// Build a fingerprint subquery to restrict to fingerprints with actual sample
// data in the original time range (not the wider timeseries table window).
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, req.Start, req.End)
sb.Select(selectCols...)
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTimeSeriesTableName))
sb.Where(
sb.In("metric_name", sqlbuilder.List(metricNames)),
sb.GE("unix_milli", adjustedStart),
sb.L("unix_milli", adjustedEnd),
fmt.Sprintf("fingerprint IN (%s)", sb.Var(fpSB)),
)
// Apply the combined filter expression (user filter + status filter + page groups IN).
if filterExpr != "" {
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: filterExpr}, req.Start, req.End)
if err != nil {
return nil, err
}
if filterClause != nil {
sb.AddWhereClause(filterClause)
}
}
// GROUP BY
groupByAliases := make([]string, 0, len(req.GroupBy))
for _, key := range req.GroupBy {
groupByAliases = append(groupByAliases, quoteIdentifier(key.Name))
}
sb.GroupBy(groupByAliases...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := m.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string]groupHostStatusCounts)
for rows.Next() {
groupVals := make([]string, len(req.GroupBy))
scanPtrs := make([]any, 0, len(req.GroupBy)+2)
for i := range groupVals {
scanPtrs = append(scanPtrs, &groupVals[i])
}
var activeCount, totalCount uint64
scanPtrs = append(scanPtrs, &activeCount, &totalCount)
if err := rows.Scan(scanPtrs...); err != nil {
return nil, err
}
compositeKey := compositeKeyFromList(groupVals)
result[compositeKey] = groupHostStatusCounts{
Active: int(activeCount),
Inactive: int(totalCount - activeCount),
}
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}
// buildHostRecords constructs the final list of HostRecords for a page.
// Groups that had no metric data get default values of -1.
//
// hostCounts is nil when host.name is in the groupBy — in that case, counts are
// derived directly from activeHostsMap (1/0 per host). When non-nil (custom groupBy
// without host.name), counts are looked up from the map.
func buildHostRecords(
isHostNameInGroupBy bool,
resp *qbtypes.QueryRangeResponse,
pageGroups []map[string]string,
groupBy []qbtypes.GroupByKey,
metadataMap map[string]map[string]string,
activeHostsMap map[string]bool,
hostCounts map[string]groupHostStatusCounts,
) []inframonitoringtypes.HostRecord {
metricsMap := parseFullQueryResponse(resp, groupBy)
records := make([]inframonitoringtypes.HostRecord, 0, len(pageGroups))
for _, labels := range pageGroups {
compositeKey := compositeKeyFromLabels(labels, groupBy)
hostName := labels[hostNameAttrKey]
activeStatus := inframonitoringtypes.HostStatusNone
activeHostCount := 0
inactiveHostCount := 0
if isHostNameInGroupBy { // derive from activeHostsMap since each row = one host
if hostName != "" {
if activeHostsMap[hostName] {
activeStatus = inframonitoringtypes.HostStatusActive
activeHostCount = 1
} else {
activeStatus = inframonitoringtypes.HostStatusInactive
inactiveHostCount = 1
}
}
} else { // derive from hostCounts since custom groupBy without host.name
if counts, ok := hostCounts[compositeKey]; ok {
activeHostCount = counts.Active
inactiveHostCount = counts.Inactive
}
}
record := inframonitoringtypes.HostRecord{
HostName: hostName,
Status: activeStatus,
ActiveHostCount: activeHostCount,
InactiveHostCount: inactiveHostCount,
CPU: -1,
Memory: -1,
Wait: -1,
Load15: -1,
DiskUsage: -1,
Meta: map[string]any{},
}
if metrics, ok := metricsMap[compositeKey]; ok {
if v, exists := metrics["F1"]; exists {
record.CPU = v
}
if v, exists := metrics["F2"]; exists {
record.Memory = v
}
if v, exists := metrics["F3"]; exists {
record.Wait = v
}
if v, exists := metrics["F4"]; exists {
record.DiskUsage = v
}
if v, exists := metrics["G"]; exists {
record.Load15 = v
}
}
if attrs, ok := metadataMap[compositeKey]; ok {
for k, v := range attrs {
record.Meta[k] = v
}
}
records = append(records, record)
}
return records
}
// getTopHostGroups runs a ranking query for the ordering metric, sorts the
// results, paginates, and backfills from metadataMap when the page extends
// past the metric-ranked groups.
func (m *module) getTopHostGroups(
ctx context.Context,
orgID valuer.UUID,
req *inframonitoringtypes.PostableHosts,
metadataMap map[string]map[string]string,
) ([]map[string]string, error) {
orderByKey := req.OrderBy.Key.Name
queryNamesForOrderBy := orderByToHostsQueryNames[orderByKey]
// The last entry is the formula/query whose value we sort by.
rankingQueryName := queryNamesForOrderBy[len(queryNamesForOrderBy)-1]
topReq := &qbtypes.QueryRangeRequest{
Start: uint64(req.Start),
End: uint64(req.End),
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0, len(queryNamesForOrderBy)),
},
}
for _, envelope := range m.newListHostsQuery().CompositeQuery.Queries {
if !slices.Contains(queryNamesForOrderBy, envelope.GetQueryName()) {
continue
}
copied := envelope
if copied.Type == qbtypes.QueryTypeBuilder {
existingExpr := ""
if f := copied.GetFilter(); f != nil {
existingExpr = f.Expression
}
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
}
merged := mergeFilterExpressions(existingExpr, reqFilterExpr)
copied.SetFilter(&qbtypes.Filter{Expression: merged})
copied.SetGroupBy(req.GroupBy)
}
topReq.CompositeQuery.Queries = append(topReq.CompositeQuery.Queries, copied)
}
resp, err := m.querier.QueryRange(ctx, orgID, topReq)
if err != nil {
return nil, err
}
allMetricGroups := parseAndSortGroups(resp, rankingQueryName, req.GroupBy, req.OrderBy.Direction)
return paginateWithBackfill(allMetricGroups, metadataMap, req.GroupBy, req.Offset, req.Limit), nil
}
// applyHostsActiveStatusFilter MODIFIES req.Filter.Expression to include an IN/NOT IN
// clause based on FilterByStatus and the set of active hosts.
// Returns true if the caller should short-circuit with an empty result (eg. ACTIVE
// requested but no hosts are active).
func (m *module) applyHostsActiveStatusFilter(req *inframonitoringtypes.PostableHosts, activeHostsMap map[string]bool) (shouldShortCircuit bool) {
if req.Filter == nil || (req.Filter.FilterByStatus != inframonitoringtypes.HostStatusActive && req.Filter.FilterByStatus != inframonitoringtypes.HostStatusInactive) {
return false
}
activeHosts := make([]string, 0, len(activeHostsMap))
for host := range activeHostsMap {
activeHosts = append(activeHosts, fmt.Sprintf("'%s'", host))
}
if len(activeHosts) == 0 {
return req.Filter.FilterByStatus == inframonitoringtypes.HostStatusActive
}
op := "IN"
if req.Filter.FilterByStatus == inframonitoringtypes.HostStatusInactive {
op = "NOT IN"
}
statusClause := fmt.Sprintf("%s %s (%s)", hostNameAttrKey, op, strings.Join(activeHosts, ", "))
req.Filter.Expression = mergeFilterExpressions(req.Filter.Expression, statusClause)
return false
}
func (m *module) getHostsTableMetadata(ctx context.Context, req *inframonitoringtypes.PostableHosts) (map[string]map[string]string, error) {
var nonGroupByAttrs []string
for _, key := range hostAttrKeysForMetadata {
if !isKeyInGroupByAttrs(req.GroupBy, key) {
nonGroupByAttrs = append(nonGroupByAttrs, key)
}
}
var filter *qbtypes.Filter
if req.Filter != nil {
filter = &req.Filter.Filter
}
metadataMap, err := m.getMetadata(ctx, hostsTableMetricNamesList, req.GroupBy, nonGroupByAttrs, filter, req.Start, req.End)
if err != nil {
return nil, err
}
return metadataMap, nil
}
// getActiveHostsQuery builds a SelectBuilder that returns distinct host names
// with metrics reported in the last 10 minutes. The builder is not executed —
// callers can either execute it (getActiveHosts) or embed it as a subquery
// (getPerGroupActiveInactiveHostCounts).
func (m *module) getActiveHostsQuery(metricNames []string, hostNameAttr string, sinceUnixMilli int64) *sqlbuilder.SelectBuilder {
sb := sqlbuilder.NewSelectBuilder()
sb.Distinct()
sb.Select("attr_string_value")
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName))
sb.Where(
sb.In("metric_name", sqlbuilder.List(metricNames)),
sb.E("attr_name", hostNameAttr),
sb.NE("attr_string_value", ""),
sb.GE("last_reported_unix_milli", sinceUnixMilli),
)
return sb
}
// getActiveHosts returns a set of host names that have reported metrics recently.
// It queries distributed_metadata for hosts where last_reported_unix_milli >= sinceUnixMilli.
func (m *module) getActiveHosts(ctx context.Context, metricNames []string, hostNameAttr string, sinceUnixMilli int64) (map[string]bool, error) {
sb := m.getActiveHostsQuery(metricNames, hostNameAttr, sinceUnixMilli)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := m.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
activeHosts := make(map[string]bool)
for rows.Next() {
var hostName string
if err := rows.Scan(&hostName); err != nil {
return nil, err
}
if hostName != "" {
activeHosts[hostName] = true
}
}
if err := rows.Err(); err != nil {
return nil, err
}
return activeHosts, nil
}

View File

@@ -1,270 +0,0 @@
package implinframonitoring
import (
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const (
hostNameAttrKey = "host.name"
)
// Helper group-by key used across all queries.
var hostNameGroupByKey = qbtypes.GroupByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: hostNameAttrKey,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
}
var hostsTableMetricNamesList = []string{
"system.cpu.time",
"system.memory.usage",
"system.cpu.load_average.15m",
"system.filesystem.usage",
}
var hostAttrKeysForMetadata = []string{
"os.type",
}
// orderByToHostsQueryNames maps the orderBy column to the query/formula names
// from HostsTableListQuery used for ranking host groups.
var orderByToHostsQueryNames = map[string][]string{
inframonitoringtypes.HostsOrderByCPU: {"A", "B", "F1"},
inframonitoringtypes.HostsOrderByMemory: {"C", "D", "F2"},
inframonitoringtypes.HostsOrderByWait: {"E", "F", "F3"},
inframonitoringtypes.HostsOrderByDiskUsage: {"H", "I", "F4"},
inframonitoringtypes.HostsOrderByLoad15: {"G"},
}
// newListHostsQuery constructs the base QueryRangeRequest with all the queries for the hosts table.
// This is kept in this file because the queries themselves do not change based on the request parameters
// only the filters, group bys, and order bys change, which are applied in buildFullQueryRequest.
func (m *module) newListHostsQuery() *qbtypes.QueryRangeRequest {
return &qbtypes.QueryRangeRequest{
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
// Query A: CPU usage logic (non-idle)
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.cpu.time",
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
Filter: &qbtypes.Filter{
Expression: "state != 'idle'",
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: true,
},
},
// Query B: CPU usage (all states)
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "B",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.cpu.time",
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: true,
},
},
// Formula F1: CPU Usage (%)
{
Type: qbtypes.QueryTypeFormula,
Spec: qbtypes.QueryBuilderFormula{
Name: "F1",
Expression: "A/B",
Legend: "CPU Usage (%)",
Disabled: false,
},
},
// Query C: Memory usage (state = used)
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "C",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.memory.usage",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
Filter: &qbtypes.Filter{
Expression: "state = 'used'",
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: true,
},
},
// Query D: Memory usage (all states)
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "D",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.memory.usage",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: true,
},
},
// Formula F2: Memory Usage (%)
{
Type: qbtypes.QueryTypeFormula,
Spec: qbtypes.QueryBuilderFormula{
Name: "F2",
Expression: "C/D",
Legend: "Memory Usage (%)",
Disabled: false,
},
},
// Query E: CPU Wait time (state = wait)
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "E",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.cpu.time",
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
Filter: &qbtypes.Filter{
Expression: "state = 'wait'",
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: true,
},
},
// Query F: CPU time (all states)
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "F",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.cpu.time",
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: true,
},
},
// Formula F3: CPU Wait Time (%)
{
Type: qbtypes.QueryTypeFormula,
Spec: qbtypes.QueryBuilderFormula{
Name: "F3",
Expression: "E/F",
Legend: "CPU Wait Time (%)",
Disabled: false,
},
},
// Query G: Load15
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "G",
Signal: telemetrytypes.SignalMetrics,
Legend: "CPU Load Average (15m)",
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.cpu.load_average.15m",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: false,
},
},
// Query H: Filesystem Usage (state = used)
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "H",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.filesystem.usage",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
Filter: &qbtypes.Filter{
Expression: "state = 'used'",
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: true,
},
},
// Query I: Filesystem Usage (all states)
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "I",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.filesystem.usage",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{hostNameGroupByKey},
Disabled: true,
},
},
// Formula F4: Disk Usage (%)
{
Type: qbtypes.QueryTypeFormula,
Spec: qbtypes.QueryBuilderFormula{
Name: "F4",
Expression: "H/I",
Legend: "Disk Usage (%)",
Disabled: false,
},
},
},
},
}
}

View File

@@ -1,24 +0,0 @@
package implinframonitoring
// The types in this file are only used within the implinframonitoring package, and are not exposed outside.
// They are primarily used for internal processing and structuring of data within the module's implementation.
type rankedGroup struct {
labels map[string]string
value float64
compositeKey string
}
// groupHostStatusCounts holds per-group active and inactive host counts.
type groupHostStatusCounts struct {
Active int
Inactive int
}
// podPhaseCounts holds per-group pod counts bucketed by latest phase in window.
type podPhaseCounts struct {
Pending int
Running int
Succeeded int
Failed int
}

View File

@@ -1,250 +0,0 @@
package implinframonitoring
import (
"context"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type module struct {
telemetryStore telemetrystore.TelemetryStore
telemetryMetadataStore telemetrytypes.MetadataStore
querier querier.Querier
fieldMapper qbtypes.FieldMapper
condBuilder qbtypes.ConditionBuilder
logger *slog.Logger
config inframonitoring.Config
}
// NewModule constructs the inframonitoring module with the provided dependencies.
func NewModule(
telemetryStore telemetrystore.TelemetryStore,
telemetryMetadataStore telemetrytypes.MetadataStore,
querier querier.Querier,
providerSettings factory.ProviderSettings,
cfg inframonitoring.Config,
) inframonitoring.Module {
fieldMapper := telemetrymetrics.NewFieldMapper()
condBuilder := telemetrymetrics.NewConditionBuilder(fieldMapper)
return &module{
telemetryStore: telemetryStore,
telemetryMetadataStore: telemetryMetadataStore,
querier: querier,
fieldMapper: fieldMapper,
condBuilder: condBuilder,
logger: providerSettings.Logger,
config: cfg,
}
}
func (m *module) ListHosts(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableHosts) (*inframonitoringtypes.Hosts, error) {
if err := req.Validate(); err != nil {
return nil, err
}
resp := &inframonitoringtypes.Hosts{}
// default to cpu order by
if req.OrderBy == nil {
req.OrderBy = &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "cpu",
},
},
Direction: qbtypes.OrderDirectionDesc,
}
}
// default to host name group by
if len(req.GroupBy) == 0 {
req.GroupBy = []qbtypes.GroupByKey{hostNameGroupByKey}
resp.Type = inframonitoringtypes.ResponseTypeList
} else {
resp.Type = inframonitoringtypes.ResponseTypeGroupedList
}
// 1. Check which required metrics exist and get earliest retention time.
// If any required metric is missing, return early with the list of missing metrics.
// 2. If metrics exist but req.End is before the earliest reported time, return early with endTimeBeforeRetention=true.
missingMetrics, minFirstReportedUnixMilli, err := m.getMetricsExistenceAndEarliestTime(ctx, hostsTableMetricNamesList)
if err != nil {
return nil, err
}
if len(missingMetrics) > 0 {
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: missingMetrics}
resp.Records = []inframonitoringtypes.HostRecord{}
resp.Total = 0
return resp, nil
}
if req.End < int64(minFirstReportedUnixMilli) {
resp.EndTimeBeforeRetention = true
resp.Records = []inframonitoringtypes.HostRecord{}
resp.Total = 0
return resp, nil
}
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: []string{}}
// TOD(nikhilmantri0902): replace this separate ClickHouse query with a sub-query inside the main query builder query
// once QB supports sub-queries.
// Determine active hosts: those with metrics reported in the last 10 minutes.
// Compute the cutoff once so every downstream query/subquery agrees on what "active" means.
sinceUnixMilli := time.Now().Add(-10 * time.Minute).UTC().UnixMilli()
activeHostsMap, err := m.getActiveHosts(ctx, hostsTableMetricNamesList, hostNameAttrKey, sinceUnixMilli)
if err != nil {
return nil, err
}
// this check below modifies req.Filter by adding `AND active hosts filter` if req.FilterByStatus is set.
if m.applyHostsActiveStatusFilter(req, activeHostsMap) {
resp.Records = []inframonitoringtypes.HostRecord{}
resp.Total = 0
return resp, nil
}
metadataMap, err := m.getHostsTableMetadata(ctx, req)
if err != nil {
return nil, err
}
resp.Total = len(metadataMap)
pageGroups, err := m.getTopHostGroups(ctx, orgID, req, metadataMap)
if err != nil {
return nil, err
}
if len(pageGroups) == 0 {
resp.Records = []inframonitoringtypes.HostRecord{}
return resp, nil
}
hostsFilterExpr := ""
if req.Filter != nil {
hostsFilterExpr = req.Filter.Expression
}
fullQueryReq := buildFullQueryRequest(req.Start, req.End, hostsFilterExpr, req.GroupBy, pageGroups, m.newListHostsQuery())
queryResp, err := m.querier.QueryRange(ctx, orgID, fullQueryReq)
if err != nil {
return nil, err
}
// Compute per-group active/inactive host counts.
// When host.name is in groupBy, each row = one host, so counts are derived
// directly from activeHostsMap in buildHostRecords (no extra query needed).
// When host.name is not in groupBy, we need to run an additional query to get the counts per group for the current page,
// using the same filter expression as the main query (including user filters + page groups IN clause).
hostCounts := make(map[string]groupHostStatusCounts)
isHostNameInGroupBy := isKeyInGroupByAttrs(req.GroupBy, hostNameAttrKey)
if !isHostNameInGroupBy {
hostCounts, err = m.getPerGroupHostStatusCounts(ctx, req, hostsTableMetricNamesList, pageGroups, sinceUnixMilli)
if err != nil {
return nil, err
}
}
resp.Records = buildHostRecords(isHostNameInGroupBy, queryResp, pageGroups, req.GroupBy, metadataMap, activeHostsMap, hostCounts)
resp.Warning = queryResp.Warning
return resp, nil
}
func (m *module) ListPods(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostablePods) (*inframonitoringtypes.Pods, error) {
if err := req.Validate(); err != nil {
return nil, err
}
resp := &inframonitoringtypes.Pods{}
if req.OrderBy == nil {
req.OrderBy = &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: inframonitoringtypes.PodsOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionDesc,
}
}
if len(req.GroupBy) == 0 {
req.GroupBy = []qbtypes.GroupByKey{podUIDGroupByKey}
resp.Type = inframonitoringtypes.ResponseTypeList
} else {
resp.Type = inframonitoringtypes.ResponseTypeGroupedList
}
missingMetrics, minFirstReportedUnixMilli, err := m.getMetricsExistenceAndEarliestTime(ctx, podsTableMetricNamesList)
if err != nil {
return nil, err
}
if len(missingMetrics) > 0 {
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: missingMetrics}
resp.Records = []inframonitoringtypes.PodRecord{}
resp.Total = 0
return resp, nil
}
if req.End < int64(minFirstReportedUnixMilli) {
resp.EndTimeBeforeRetention = true
resp.Records = []inframonitoringtypes.PodRecord{}
resp.Total = 0
return resp, nil
}
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: []string{}}
metadataMap, err := m.getPodsTableMetadata(ctx, req)
if err != nil {
return nil, err
}
resp.Total = len(metadataMap)
pageGroups, err := m.getTopPodGroups(ctx, orgID, req, metadataMap)
if err != nil {
return nil, err
}
if len(pageGroups) == 0 {
resp.Records = []inframonitoringtypes.PodRecord{}
return resp, nil
}
filterExpr := ""
if req.Filter != nil {
filterExpr = req.Filter.Expression
}
// Query G (per-pod latest phase) is meaningful only under list mode (k8s.pod.uid in groupBy).
// Under custom groupBy we use getPerGroupPodPhaseCounts to bucket pods per group.
isPodUIDInGroupBy := isKeyInGroupByAttrs(req.GroupBy, podUIDAttrKey)
fullQueryReq := buildFullQueryRequest(req.Start, req.End, filterExpr, req.GroupBy, pageGroups, m.newPodsTableListQuery(isPodUIDInGroupBy))
queryResp, err := m.querier.QueryRange(ctx, orgID, fullQueryReq)
if err != nil {
return nil, err
}
phaseCounts := make(map[string]podPhaseCounts)
if !isPodUIDInGroupBy {
phaseCounts, err = m.getPerGroupPodPhaseCounts(ctx, req, pageGroups)
if err != nil {
return nil, err
}
}
resp.Records = buildPodRecords(isPodUIDInGroupBy, queryResp, pageGroups, req.GroupBy, metadataMap, phaseCounts, req.End)
resp.Warning = queryResp.Warning
return resp, nil
}

View File

@@ -1,372 +0,0 @@
package implinframonitoring
import (
"context"
"fmt"
"slices"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
)
func mapPhaseNumToString(v float64) inframonitoringtypes.PodPhase {
switch int(v) {
case 1:
return inframonitoringtypes.PodPhasePending
case 2:
return inframonitoringtypes.PodPhaseRunning
case 3:
return inframonitoringtypes.PodPhaseSucceeded
case 4:
return inframonitoringtypes.PodPhaseFailed
default:
return inframonitoringtypes.PodPhaseNone
}
}
// buildPodRecords assembles the page records.
//
// isPodUIDInGroupBy=true (list mode): one row = one pod. PodPhase is read from
// query G's result, and the matching *PodCount field is set to 1.
//
// isPodUIDInGroupBy=false (grouped_list mode): rows are groups. PodPhase stays
// PodPhaseNone; *PodCount fields come from phaseCounts (zeros when group missing).
func buildPodRecords(
isPodUIDInGroupBy bool,
resp *qbtypes.QueryRangeResponse,
pageGroups []map[string]string,
groupBy []qbtypes.GroupByKey,
metadataMap map[string]map[string]string,
phaseCounts map[string]podPhaseCounts,
reqEnd int64,
) []inframonitoringtypes.PodRecord {
metricsMap := parseFullQueryResponse(resp, groupBy)
records := make([]inframonitoringtypes.PodRecord, 0, len(pageGroups))
for _, labels := range pageGroups {
compositeKey := compositeKeyFromLabels(labels, groupBy)
podUID := labels[podUIDAttrKey]
record := inframonitoringtypes.PodRecord{
PodUID: podUID,
PodCPU: -1,
PodCPURequest: -1,
PodCPULimit: -1,
PodMemory: -1,
PodMemoryRequest: -1,
PodMemoryLimit: -1,
PodAge: -1,
Meta: map[string]any{},
}
if metrics, ok := metricsMap[compositeKey]; ok {
if v, exists := metrics["A"]; exists {
record.PodCPU = v
}
if v, exists := metrics["B"]; exists {
record.PodCPURequest = v
}
if v, exists := metrics["C"]; exists {
record.PodCPULimit = v
}
if v, exists := metrics["D"]; exists {
record.PodMemory = v
}
if v, exists := metrics["E"]; exists {
record.PodMemoryRequest = v
}
if v, exists := metrics["F"]; exists {
record.PodMemoryLimit = v
}
}
if isPodUIDInGroupBy { // derive phase + count=1 from query G
if metrics, ok := metricsMap[compositeKey]; ok {
if v, exists := metrics["G"]; exists {
record.PodPhase = mapPhaseNumToString(v)
switch record.PodPhase {
case inframonitoringtypes.PodPhasePending:
record.PendingPodCount = 1
case inframonitoringtypes.PodPhaseRunning:
record.RunningPodCount = 1
case inframonitoringtypes.PodPhaseSucceeded:
record.SucceededPodCount = 1
case inframonitoringtypes.PodPhaseFailed:
record.FailedPodCount = 1
}
}
}
} else { // derive counts from phaseCounts; PodPhase stays PodPhaseNone
if c, ok := phaseCounts[compositeKey]; ok {
record.PendingPodCount = c.Pending
record.RunningPodCount = c.Running
record.SucceededPodCount = c.Succeeded
record.FailedPodCount = c.Failed
}
}
if attrs, ok := metadataMap[compositeKey]; ok {
if startTimeStr, exists := attrs[podStartTimeAttrKey]; exists && startTimeStr != "" {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
startTimeMs := t.UnixMilli()
if startTimeMs > 0 {
record.PodAge = reqEnd - startTimeMs
}
}
}
for k, v := range attrs {
record.Meta[k] = v
}
}
records = append(records, record)
}
return records
}
func (m *module) getTopPodGroups(
ctx context.Context,
orgID valuer.UUID,
req *inframonitoringtypes.PostablePods,
metadataMap map[string]map[string]string,
) ([]map[string]string, error) {
orderByKey := req.OrderBy.Key.Name
queryNamesForOrderBy := orderByToPodsQueryNames[orderByKey]
rankingQueryName := queryNamesForOrderBy[len(queryNamesForOrderBy)-1]
topReq := &qbtypes.QueryRangeRequest{
Start: uint64(req.Start),
End: uint64(req.End),
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0, len(queryNamesForOrderBy)),
},
}
// Ranking never needs query G (phase removed from valid orderBy keys).
for _, envelope := range m.newPodsTableListQuery(false).CompositeQuery.Queries {
if !slices.Contains(queryNamesForOrderBy, envelope.GetQueryName()) {
continue
}
copied := envelope
if copied.Type == qbtypes.QueryTypeBuilder {
existingExpr := ""
if f := copied.GetFilter(); f != nil {
existingExpr = f.Expression
}
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
}
merged := mergeFilterExpressions(existingExpr, reqFilterExpr)
copied.SetFilter(&qbtypes.Filter{Expression: merged})
copied.SetGroupBy(req.GroupBy)
}
topReq.CompositeQuery.Queries = append(topReq.CompositeQuery.Queries, copied)
}
resp, err := m.querier.QueryRange(ctx, orgID, topReq)
if err != nil {
return nil, err
}
allMetricGroups := parseAndSortGroups(resp, rankingQueryName, req.GroupBy, req.OrderBy.Direction)
return paginateWithBackfill(allMetricGroups, metadataMap, req.GroupBy, req.Offset, req.Limit), nil
}
func (m *module) getPodsTableMetadata(ctx context.Context, req *inframonitoringtypes.PostablePods) (map[string]map[string]string, error) {
var nonGroupByAttrs []string
for _, key := range podAttrKeysForMetadata {
if !isKeyInGroupByAttrs(req.GroupBy, key) {
nonGroupByAttrs = append(nonGroupByAttrs, key)
}
}
return m.getMetadata(ctx, podsTableMetricNamesList, req.GroupBy, nonGroupByAttrs, req.Filter, req.Start, req.End)
}
// getPerGroupPodPhaseCounts computes per-group pod counts bucketed by each
// pod's latest phase in the requested window. Mirrors getPerGroupHostStatusCounts
// but uses 3 CTEs because pod phase classification needs the sample VALUE
// (not just "did it report"), so attributes_metadata can't carry it.
//
// Pipeline:
//
// CTE A (time_series_fps): fp ↔ (pod_uid, groupBy cols) from time_series table.
// User filter + page-groups filter applied here.
// CTE B (pod_phase_samples): fp → (latest phase value, its timestamp) via
// argMax(value, unix_milli) on samples table.
// CTE C (pod_phase_per_pod): collapse fp → pod via argMax over per-fp latest
// timestamp (latest-reported fp wins).
// Outer: per-group uniqExactIf into 4 phase buckets.
//
// Groups absent from the result map have implicit zero counts (caller default).
func (m *module) getPerGroupPodPhaseCounts(
ctx context.Context,
req *inframonitoringtypes.PostablePods,
pageGroups []map[string]string,
) (map[string]podPhaseCounts, error) {
if len(pageGroups) == 0 || len(req.GroupBy) == 0 {
return map[string]podPhaseCounts{}, nil
}
// Merged filter expression (user filter + page-groups IN clauses).
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
}
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
// Resolve tables. Same convention as hosts (distributed names from helpers).
adjustedStart, adjustedEnd, distributedTSTable, _ := telemetrymetrics.WhichTSTableToUse(
uint64(req.Start), uint64(req.End), nil,
)
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
uint64(req.Start), uint64(req.End),
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
)
// Aggregated samples tables hold the latest value in `last`, not `value`.
valueCol := "value"
if samplesTable == telemetrymetrics.SamplesV4Agg5mTableName ||
samplesTable == telemetrymetrics.SamplesV4Agg30mTableName {
valueCol = "last"
}
// ----- CTE A: time_series_fps -----
cteA := sqlbuilder.NewSelectBuilder()
cteASelectCols := []string{
"fingerprint",
fmt.Sprintf("JSONExtractString(labels, %s) AS pod_uid", cteA.Var(podUIDAttrKey)),
}
for _, key := range req.GroupBy {
cteASelectCols = append(cteASelectCols,
fmt.Sprintf("JSONExtractString(labels, %s) AS %s", cteA.Var(key.Name), quoteIdentifier(key.Name)),
)
}
cteA.Select(cteASelectCols...)
cteA.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTSTable))
cteA.Where(
cteA.E("metric_name", podPhaseMetricName),
cteA.GE("unix_milli", adjustedStart),
cteA.L("unix_milli", adjustedEnd),
)
if filterExpr != "" {
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: filterExpr}, req.Start, req.End)
if err != nil {
return nil, err
}
if filterClause != nil {
cteA.AddWhereClause(filterClause)
}
}
cteAGroupBy := []string{"fingerprint", "pod_uid"}
for _, key := range req.GroupBy {
cteAGroupBy = append(cteAGroupBy, quoteIdentifier(key.Name))
}
cteA.GroupBy(cteAGroupBy...)
cteASQL, cteAArgs := cteA.BuildWithFlavor(sqlbuilder.ClickHouse)
// ----- CTE B: pod_phase_samples -----
cteB := sqlbuilder.NewSelectBuilder()
cteB.Select(
"fingerprint",
fmt.Sprintf("argMax(%s, unix_milli) AS phase_value", valueCol),
"max(unix_milli) AS latest_unix_milli",
)
cteB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
cteB.Where(
cteB.E("metric_name", podPhaseMetricName),
cteB.GE("unix_milli", req.Start),
cteB.L("unix_milli", req.End),
"fingerprint GLOBAL IN (SELECT fingerprint FROM time_series_fps)", // TODO(nikhilmantri0902): GLOBAL IN is added here because results were not accurate with IN and local table, why?
)
cteB.GroupBy("fingerprint")
cteBSQL, cteBArgs := cteB.BuildWithFlavor(sqlbuilder.ClickHouse)
// ----- CTE C: pod_phase_per_pod (no parameters) -----
// Collapse fingerprints -> pod via argMax over each fingerprint's
// latest_unix_milli. Time-anchored: the fp whose newest sample is most
// recent wins — consistent with argMax inside CTE B.
cteCSelectCols := []string{"tsfp.pod_uid AS pod_uid"}
cteCGroupBy := []string{"pod_uid"}
for _, key := range req.GroupBy {
col := quoteIdentifier(key.Name)
cteCSelectCols = append(cteCSelectCols, fmt.Sprintf("tsfp.%s AS %s", col, col))
cteCGroupBy = append(cteCGroupBy, col)
}
cteCSelectCols = append(cteCSelectCols,
"argMax(sph.phase_value, sph.latest_unix_milli) AS phase_value",
)
cteCSQL := fmt.Sprintf(
"SELECT %s FROM time_series_fps AS tsfp INNER JOIN pod_phase_samples AS sph ON tsfp.fingerprint = sph.fingerprint WHERE tsfp.pod_uid != '' GROUP BY %s",
strings.Join(cteCSelectCols, ", "),
strings.Join(cteCGroupBy, ", "),
)
// ----- Outer SELECT -----
outerSelectCols := make([]string, 0, len(req.GroupBy)+4)
outerGroupBy := make([]string, 0, len(req.GroupBy))
for _, key := range req.GroupBy {
col := quoteIdentifier(key.Name)
outerSelectCols = append(outerSelectCols, col)
outerGroupBy = append(outerGroupBy, col)
}
outerSelectCols = append(outerSelectCols,
"uniqExactIf(pod_uid, phase_value = 1) AS pending_count",
"uniqExactIf(pod_uid, phase_value = 2) AS running_count",
"uniqExactIf(pod_uid, phase_value = 3) AS succeeded_count",
"uniqExactIf(pod_uid, phase_value = 4) AS failed_count",
)
outerSQL := fmt.Sprintf(
"SELECT %s FROM pod_phase_per_pod GROUP BY %s",
strings.Join(outerSelectCols, ", "),
strings.Join(outerGroupBy, ", "),
)
// Combine CTEs + outer.
cteFragments := []string{
fmt.Sprintf("time_series_fps AS (%s)", cteASQL),
fmt.Sprintf("pod_phase_samples AS (%s)", cteBSQL),
fmt.Sprintf("pod_phase_per_pod AS (%s)", cteCSQL),
}
finalSQL := querybuilder.CombineCTEs(cteFragments) + outerSQL
finalArgs := querybuilder.PrependArgs([][]any{cteAArgs, cteBArgs}, nil)
rows, err := m.telemetryStore.ClickhouseDB().Query(ctx, finalSQL, finalArgs...)
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string]podPhaseCounts)
for rows.Next() {
groupVals := make([]string, len(req.GroupBy))
scanPtrs := make([]any, 0, len(req.GroupBy)+4)
for i := range groupVals {
scanPtrs = append(scanPtrs, &groupVals[i])
}
var pending, running, succeeded, failed uint64
scanPtrs = append(scanPtrs, &pending, &running, &succeeded, &failed)
if err := rows.Scan(scanPtrs...); err != nil {
return nil, err
}
result[compositeKeyFromList(groupVals)] = podPhaseCounts{
Pending: int(pending),
Running: int(running),
Succeeded: int(succeeded),
Failed: int(failed),
}
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}

View File

@@ -1,202 +0,0 @@
package implinframonitoring
import (
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const (
podUIDAttrKey = "k8s.pod.uid"
podStartTimeAttrKey = "k8s.pod.start_time"
podPhaseMetricName = "k8s.pod.phase"
)
var podUIDGroupByKey = qbtypes.GroupByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: podUIDAttrKey,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
}
var podsTableMetricNamesList = []string{
"k8s.pod.cpu.usage",
"k8s.pod.cpu_request_utilization",
"k8s.pod.cpu_limit_utilization",
"k8s.pod.memory.working_set",
"k8s.pod.memory_request_utilization",
"k8s.pod.memory_limit_utilization",
"k8s.pod.phase",
}
var podAttrKeysForMetadata = []string{
"k8s.pod.uid",
"k8s.pod.name",
"k8s.namespace.name",
"k8s.node.name",
"k8s.deployment.name",
"k8s.statefulset.name",
"k8s.daemonset.name",
"k8s.job.name",
"k8s.cronjob.name",
"k8s.cluster.name",
"k8s.pod.start_time",
}
var orderByToPodsQueryNames = map[string][]string{
inframonitoringtypes.PodsOrderByCPU: {"A"},
inframonitoringtypes.PodsOrderByCPURequest: {"B"},
inframonitoringtypes.PodsOrderByCPULimit: {"C"},
inframonitoringtypes.PodsOrderByMemory: {"D"},
inframonitoringtypes.PodsOrderByMemoryRequest: {"E"},
inframonitoringtypes.PodsOrderByMemoryLimit: {"F"},
}
// newPodsTableListQuery builds the composite QB v5 request for the pods list.
// includePhaseQuery controls whether query G (per-pod latest phase) is appended.
// G is meaningful only when k8s.pod.uid is in groupBy (one row = one pod);
// under custom groupBy, per-group phase counts come from getPerGroupPodPhaseCounts.
func (m *module) newPodsTableListQuery(includePhaseQuery bool) *qbtypes.QueryRangeRequest {
queries := []qbtypes.QueryEnvelope{
// Query A: CPU usage
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.cpu.usage",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query B: CPU request utilization
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "B",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.cpu_request_utilization",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query C: CPU limit utilization
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "C",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.cpu_limit_utilization",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query D: Memory working set
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "D",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.memory.working_set",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query E: Memory request utilization
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "E",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.memory_request_utilization",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
// Query F: Memory limit utilization
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "F",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.memory_limit_utilization",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
},
}
if includePhaseQuery {
// Query G: Pod phase (latest value per pod). Only meaningful when
// k8s.pod.uid is in groupBy — under custom groupBy this query is
// replaced by getPerGroupPodPhaseCounts which buckets pods per group.
queries = append(queries, qbtypes.QueryEnvelope{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "G",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.phase",
TimeAggregation: metrictypes.TimeAggregationLatest,
SpaceAggregation: metrictypes.SpaceAggregationMax,
ReduceTo: qbtypes.ReduceToLast,
},
},
GroupBy: []qbtypes.GroupByKey{podUIDGroupByKey},
Disabled: false,
},
})
}
return &qbtypes.QueryRangeRequest{
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
}
}

View File

@@ -1,19 +0,0 @@
package inframonitoring
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Handler interface {
ListHosts(http.ResponseWriter, *http.Request)
ListPods(http.ResponseWriter, *http.Request)
}
type Module interface {
ListHosts(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableHosts) (*inframonitoringtypes.Hosts, error)
ListPods(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostablePods) (*inframonitoringtypes.Pods, error)
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/querybuilder"
@@ -41,10 +42,11 @@ type module struct {
ruleStore ruletypes.RuleStore
dashboardModule dashboard.Module
config metricsexplorer.Config
flagger flagger.Flagger
}
// NewModule constructs the metrics module with the provided dependencies.
func NewModule(ts telemetrystore.TelemetryStore, telemetryMetadataStore telemetrytypes.MetadataStore, cache cache.Cache, ruleStore ruletypes.RuleStore, dashboardModule dashboard.Module, providerSettings factory.ProviderSettings, cfg metricsexplorer.Config) metricsexplorer.Module {
func NewModule(ts telemetrystore.TelemetryStore, telemetryMetadataStore telemetrytypes.MetadataStore, cache cache.Cache, ruleStore ruletypes.RuleStore, dashboardModule dashboard.Module, providerSettings factory.ProviderSettings, cfg metricsexplorer.Config, fl flagger.Flagger) metricsexplorer.Module {
fieldMapper := telemetrymetrics.NewFieldMapper()
condBuilder := telemetrymetrics.NewConditionBuilder(fieldMapper)
return &module{
@@ -57,6 +59,7 @@ func NewModule(ts telemetrystore.TelemetryStore, telemetryMetadataStore telemetr
ruleStore: ruleStore,
dashboardModule: dashboardModule,
config: cfg,
flagger: fl,
}
}
@@ -955,6 +958,7 @@ func (m *module) buildFilterClause(ctx context.Context, filter *qbtypes.Filter,
ConditionBuilder: m.condBuilder,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "metric_name", FieldContext: telemetrytypes.FieldContextMetric},
FieldKeys: keys,
Flagger: m.flagger,
StartNs: querybuilder.ToNanoSecs(uint64(startMillis)),
EndNs: querybuilder.ToNanoSecs(uint64(endMillis)),
}

View File

@@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -27,9 +28,10 @@ type store struct {
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
logger *slog.Logger
flagger flagger.Flagger
}
func NewStore(telemetryStore telemetrystore.TelemetryStore, telemetryMetadataStore telemetrytypes.MetadataStore, logger *slog.Logger) rulestatehistorytypes.Store {
func NewStore(telemetryStore telemetrystore.TelemetryStore, telemetryMetadataStore telemetrytypes.MetadataStore, logger *slog.Logger, fl flagger.Flagger) rulestatehistorytypes.Store {
fm := newFieldMapper()
return &store{
telemetryStore: telemetryStore,
@@ -37,6 +39,7 @@ func NewStore(telemetryStore telemetrystore.TelemetryStore, telemetryMetadataSto
fieldMapper: fm,
conditionBuilder: newConditionBuilder(fm),
logger: logger,
flagger: fl,
}
}
@@ -501,6 +504,7 @@ func (s *store) buildFilterClause(ctx context.Context, filter qbtypes.Filter, st
FieldMapper: s.fieldMapper,
ConditionBuilder: s.conditionBuilder,
FieldKeys: fieldKeys,
Flagger: s.flagger,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels", FieldContext: telemetrytypes.FieldContextAttribute},
}

View File

@@ -72,13 +72,14 @@ func newProvider(
telemetrymetadata.DBName,
telemetrymetadata.AttributesMetadataLocalTableName,
telemetrymetadata.ColumnEvolutionMetadataTableName,
flagger,
)
// Create trace statement builder
traceFieldMapper := telemetrytraces.NewFieldMapper()
traceConditionBuilder := telemetrytraces.NewConditionBuilder(traceFieldMapper)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(settings, nil, traceFieldMapper, traceConditionBuilder, nil)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(settings, nil, traceFieldMapper, traceConditionBuilder, nil, flagger)
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
settings,
telemetryMetadataStore,
@@ -86,6 +87,7 @@ func newProvider(
traceConditionBuilder,
traceAggExprRewriter,
telemetryStore,
flagger,
)
// Create trace operator statement builder
@@ -96,17 +98,19 @@ func newProvider(
traceConditionBuilder,
traceStmtBuilder,
traceAggExprRewriter,
flagger,
)
// Create log statement builder
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logFieldMapper := telemetrylogs.NewFieldMapper(flagger)
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper, flagger)
logAggExprRewriter := querybuilder.NewAggExprRewriter(
settings,
telemetrylogs.DefaultFullTextColumn,
logFieldMapper,
logConditionBuilder,
telemetrylogs.GetBodyJSONKey,
flagger,
)
logStmtBuilder := telemetrylogs.NewLogQueryStatementBuilder(
settings,
@@ -116,6 +120,7 @@ func newProvider(
logAggExprRewriter,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.GetBodyJSONKey,
flagger,
)
// Create audit statement builder
@@ -127,6 +132,7 @@ func newProvider(
auditFieldMapper,
auditConditionBuilder,
nil,
flagger,
)
auditStmtBuilder := telemetryaudit.NewAuditQueryStatementBuilder(
settings,
@@ -136,6 +142,7 @@ func newProvider(
auditAggExprRewriter,
telemetryaudit.DefaultFullTextColumn,
nil,
flagger,
)
// Create metric statement builder
@@ -156,6 +163,7 @@ func newProvider(
metricFieldMapper,
metricConditionBuilder,
metricStmtBuilder,
flagger,
)
// Create bucket cache

View File

@@ -11,6 +11,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/queryparser"
"log/slog"
@@ -1774,6 +1775,15 @@ func (aH *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
Route: "",
})
bodyJSONQuery := querybuilder.IsBodyJSONEnabled(r.Context(), aH.Signoz.Flagger)
featureSet = append(featureSet, &licensetypes.Feature{
Name: valuer.NewString(flagger.FeatureBodyJSONQuery.String()),
Active: bodyJSONQuery,
Usage: 0,
UsageLimit: -1,
Route: "",
})
if constants.IsDotMetricsEnabled {
for idx, feature := range featureSet {
if feature.Name == licensetypes.DotMetricsEnabled {

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
@@ -38,18 +39,21 @@ type LogParsingPipelineController struct {
GetIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, error)
// TODO(Piyush): remove with qbv5 migration
reader interfaces.Reader
fl flagger.Flagger
}
func NewLogParsingPipelinesController(
sqlStore sqlstore.SQLStore,
getIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, error),
reader interfaces.Reader,
fl flagger.Flagger,
) (*LogParsingPipelineController, error) {
repo := NewRepo(sqlStore)
return &LogParsingPipelineController{
Repo: repo,
GetIntegrationPipelines: getIntegrationPipelines,
reader: reader,
fl: fl,
}, nil
}
@@ -363,14 +367,14 @@ func (pc *LogParsingPipelineController) AgentFeatureType() agentConf.AgentFeatur
// Implements agentConf.AgentFeature interface.
// RecommendAgentConfig generates the collector config to be sent to agents.
// The normalize pipeline (when BodyJSONQueryEnabled) is injected here, after
// The normalize pipeline (when body_json_enabled feature flag is on) is injected here, after
// rawPipelineData is serialized. So it is only present in the config sent to
// the collector and never persisted to the database as part of the user's pipeline list.
//
// NOTE: The configId sent to agents is derived from the pipeline version number
// (e.g. "LogPipelines:5"), not the YAML content. If server-side logic changes
// the generated YAML without bumping the version (e.g. toggling BodyJSONQueryEnabled
// or updating operator IfExpressions), agents that already applied that version will
// the generated YAML without bumping the version (e.g. toggling the body_json_enabled
// flag or updating operator IfExpressions), agents that already applied that version will
// not re-apply the new config. In such cases, users must save a new pipeline version
// via the API to force agents to pick up the change.
func (pc *LogParsingPipelineController) RecommendAgentConfig(
@@ -398,7 +402,7 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
return nil, "", err
}
if querybuilder.BodyJSONQueryEnabled {
if querybuilder.IsBodyJSONEnabled(ctx, pc.fl) {
// add default normalize pipeline at the beginning, only for sending to collector
enrichedPipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, enrichedPipelines...)
}

View File

@@ -93,6 +93,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
signoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
reader,
signoz.Flagger,
)
if err != nil {
return nil, err

View File

@@ -15,6 +15,8 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
)
func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.TelemetryStore) (querier.Querier, *telemetrytypestest.MockMetadataStore) {
@@ -54,8 +56,8 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
), metadataStore
}
func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap map[string][]*telemetrytypes.TelemetryFieldKey) querier.Querier {
func prepareQuerierForLogs(t *testing.T, telemetryStore telemetrystore.TelemetryStore, keysMap map[string][]*telemetrytypes.TelemetryFieldKey) querier.Querier {
t.Helper()
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -66,14 +68,16 @@ func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap
}
metadataStore.KeysMap = keysMap
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
fl := flaggertest.New(t)
logFieldMapper := telemetrylogs.NewFieldMapper(fl)
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper, fl)
logAggExprRewriter := querybuilder.NewAggExprRewriter(
providerSettings,
telemetrylogs.DefaultFullTextColumn,
logFieldMapper,
logConditionBuilder,
telemetrylogs.GetBodyJSONKey,
fl,
)
logStmtBuilder := telemetrylogs.NewLogQueryStatementBuilder(
providerSettings,
@@ -83,6 +87,7 @@ func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap
logAggExprRewriter,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.GetBodyJSONKey,
fl,
)
return querier.New(
@@ -100,7 +105,8 @@ func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap
)
}
func prepareQuerierForTraces(telemetryStore telemetrystore.TelemetryStore, keysMap map[string][]*telemetrytypes.TelemetryFieldKey) querier.Querier {
func prepareQuerierForTraces(t *testing.T, telemetryStore telemetrystore.TelemetryStore, keysMap map[string][]*telemetrytypes.TelemetryFieldKey) querier.Querier {
t.Helper()
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -116,7 +122,8 @@ func prepareQuerierForTraces(telemetryStore telemetrystore.TelemetryStore, keysM
traceFieldMapper := telemetrytraces.NewFieldMapper()
traceConditionBuilder := telemetrytraces.NewConditionBuilder(traceFieldMapper)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(providerSettings, nil, traceFieldMapper, traceConditionBuilder, nil)
fl := flaggertest.New(t)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(providerSettings, nil, traceFieldMapper, traceConditionBuilder, nil, fl)
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
providerSettings,
metadataStore,
@@ -124,6 +131,7 @@ func prepareQuerierForTraces(telemetryStore telemetrystore.TelemetryStore, keysM
traceConditionBuilder,
traceAggExprRewriter,
telemetryStore,
fl,
)
return querier.New(

View File

@@ -829,7 +829,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
WithArgs(nil, nil, nil, nil, nil, nil, nil).
WillReturnRows(rows)
querier := prepareQuerierForTraces(telemetryStore, keysMap)
querier := prepareQuerierForTraces(t, telemetryStore, keysMap)
postableRule.RuleCondition.CompareOperator = c.compareOperator
postableRule.RuleCondition.MatchType = c.matchType
@@ -946,7 +946,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil).
WillReturnRows(rows)
querier := prepareQuerierForLogs(telemetryStore, keysMap)
querier := prepareQuerierForLogs(t, telemetryStore, keysMap)
postableRule.RuleCondition.CompareOperator = c.compareOperator
postableRule.RuleCondition.MatchType = c.matchType

View File

@@ -9,6 +9,7 @@ import (
chparser "github.com/AfterShip/clickhouse-sql-parser/parser"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -21,6 +22,7 @@ type aggExprRewriter struct {
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
flagger flagger.Flagger
}
var _ qbtypes.AggExprRewriter = (*aggExprRewriter)(nil)
@@ -31,6 +33,7 @@ func NewAggExprRewriter(
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
fl flagger.Flagger,
) *aggExprRewriter {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/agg_rewrite")
@@ -40,6 +43,7 @@ func NewAggExprRewriter(
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
jsonKeyToKey: jsonKeyToKey,
flagger: fl,
}
}
@@ -86,6 +90,7 @@ func (r *aggExprRewriter) Rewrite(
r.fieldMapper,
r.conditionBuilder,
r.jsonKeyToKey,
r.flagger,
)
// Rewrite the first select item (our expression)
if err := sel.SelectItems[0].Accept(visitor); err != nil {
@@ -138,6 +143,7 @@ type exprVisitor struct {
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
flagger flagger.Flagger
Modified bool
chArgs []any
isRate bool
@@ -153,6 +159,7 @@ func newExprVisitor(
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
fl flagger.Flagger,
) *exprVisitor {
return &exprVisitor{
ctx: ctx,
@@ -164,6 +171,7 @@ func newExprVisitor(
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
jsonKeyToKey: jsonKeyToKey,
flagger: fl,
}
}
@@ -209,6 +217,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
FieldKeys: v.fieldKeys,
FieldMapper: v.fieldMapper,
ConditionBuilder: v.conditionBuilder,
Flagger: v.flagger,
FullTextColumn: v.fullTextColumn,
JsonKeyToKey: v.jsonKeyToKey,
StartNs: v.startNs,
@@ -237,7 +246,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
for i := 0; i < len(args)-1; i++ {
origVal := args[i].String()
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(origVal)
expr, exprArgs, err := CollisionHandledFinalExpr(v.ctx, v.startNs, v.endNs, &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey)
expr, exprArgs, err := CollisionHandledFinalExpr(v.ctx, v.startNs, v.endNs, &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey, v.flagger)
if err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to get table field name for %q", origVal)
}
@@ -255,7 +264,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
for i, arg := range args {
orig := arg.String()
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(orig)
expr, exprArgs, err := CollisionHandledFinalExpr(v.ctx, v.startNs, v.endNs, &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey)
expr, exprArgs, err := CollisionHandledFinalExpr(v.ctx, v.startNs, v.endNs, &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey, v.flagger)
if err != nil {
return err
}

View File

@@ -1,7 +1,11 @@
package querybuilder
import (
"os"
"context"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
const (
@@ -14,14 +18,7 @@ var (
SkippableConditionLiterals = []string{SkipConditionLiteral, ErrorConditionLiteral}
)
var (
BodyJSONQueryEnabled = GetOrDefaultEnv("BODY_JSON_QUERY_ENABLED", "false") == "true"
)
func GetOrDefaultEnv(key string, fallback string) string {
v := os.Getenv(key)
if len(v) == 0 {
return fallback
}
return v
// IsBodyJSONEnabled evaluates the body_json_enabled feature flag.
func IsBodyJSONEnabled(ctx context.Context, fl flagger.Flagger) bool {
return fl.BooleanOrEmpty(ctx, flagger.FeatureBodyJSONQuery, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{}))
}

View File

@@ -11,6 +11,7 @@ import (
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -27,6 +28,7 @@ func CollisionHandledFinalExpr(
keys map[string][]*telemetrytypes.TelemetryFieldKey,
requiredDataType telemetrytypes.FieldDataType,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
fl flagger.Flagger,
) (string, []any, error) {
if requiredDataType != telemetrytypes.FieldDataTypeString &&
@@ -106,7 +108,7 @@ func CollisionHandledFinalExpr(
}
// first if condition covers the older tests and second if condition covers the array conditions
if !BodyJSONQueryEnabled && field.FieldContext == telemetrytypes.FieldContextBody && jsonKeyToKey != nil {
if !IsBodyJSONEnabled(ctx, fl) && field.FieldContext == telemetrytypes.FieldContextBody && jsonKeyToKey != nil {
return "", nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "Group by/Aggregation isn't available for the body column")
} else if strings.Contains(field.Name, telemetrytypes.ArraySep) || strings.Contains(field.Name, telemetrytypes.ArrayAnyIndex) {
return "", nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "Group by/Aggregation isn't available for the Array Paths: %s", field.Name)

View File

@@ -9,6 +9,7 @@ import (
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
grammar "github.com/SigNoz/signoz/pkg/parser/filterquery/grammar"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -36,6 +37,7 @@ type filterExpressionVisitor struct {
builder *sqlbuilder.SelectBuilder
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
flagger flagger.Flagger
skipResourceFilter bool
skipFullTextFilter bool
skipFunctionCalls bool
@@ -56,6 +58,7 @@ type FilterExprVisitorOpts struct {
Builder *sqlbuilder.SelectBuilder
FullTextColumn *telemetrytypes.TelemetryFieldKey
JsonKeyToKey qbtypes.JsonKeyToFieldFunc
Flagger flagger.Flagger
SkipResourceFilter bool
SkipFullTextFilter bool
SkipFunctionCalls bool
@@ -76,6 +79,7 @@ func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVis
builder: opts.Builder,
fullTextColumn: opts.FullTextColumn,
jsonKeyToKey: opts.JsonKeyToKey,
flagger: opts.Flagger,
skipResourceFilter: opts.SkipResourceFilter,
skipFullTextFilter: opts.SkipFullTextFilter,
skipFunctionCalls: opts.SkipFunctionCalls,
@@ -751,8 +755,7 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
return ErrorConditionLiteral
}
// filter arrays from keys
if BodyJSONQueryEnabled && functionName != "hasToken" {
if IsBodyJSONEnabled(v.context, v.flagger) && functionName != "hasToken" {
filteredKeys := []*telemetrytypes.TelemetryFieldKey{}
for _, key := range keys {
if key.FieldDataType.IsArray() {
@@ -793,7 +796,7 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
// this is that all other functions only support array fields
if key.FieldContext == telemetrytypes.FieldContextBody {
var err error
if BodyJSONQueryEnabled {
if IsBodyJSONEnabled(v.context, v.flagger) {
fieldName, err = v.fieldMapper.FieldFor(v.context, v.startNs, v.endNs, key)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to get field name for key %s: %s", key.Name, err.Error()))
@@ -936,7 +939,7 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
// Note: Skip this logic if body json query is enabled so we can look up the key inside fields
//
// TODO(Piyush): After entire migration this is supposed to be removed.
if !BodyJSONQueryEnabled && fieldKey.FieldContext == telemetrytypes.FieldContextBody {
if fieldKey.FieldContext == telemetrytypes.FieldContextBody && !IsBodyJSONEnabled(v.context, v.flagger) {
fieldKeysForName = append(fieldKeysForName, &fieldKey)
}

View File

@@ -7,6 +7,7 @@ import (
"strings"
"testing"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
grammar "github.com/SigNoz/signoz/pkg/parser/filterquery/grammar"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -60,6 +61,7 @@ func TestPrepareWhereClause_EmptyVariableList(t *testing.T) {
Context: context.Background(),
FieldKeys: keys,
Variables: tt.variables,
Flagger: flaggertest.New(t),
}
_, err := PrepareWhereClause(tt.expr, opts)
@@ -79,10 +81,13 @@ func TestPrepareWhereClause_EmptyVariableList(t *testing.T) {
}
// createTestVisitor creates a filterExpressionVisitor for testing VisitKey.
func createTestVisitor(fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey, ignoreNotFoundKeys bool) *filterExpressionVisitor {
func createTestVisitor(t *testing.T, fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey, ignoreNotFoundKeys bool) *filterExpressionVisitor {
t.Helper()
return &filterExpressionVisitor{
context: t.Context(),
logger: slog.Default(),
fieldKeys: fieldKeys,
flagger: flaggertest.New(t),
ignoreNotFoundKeys: ignoreNotFoundKeys,
keysWithWarnings: make(map[string]bool),
builder: sqlbuilder.NewSelectBuilder(),
@@ -572,7 +577,7 @@ func TestVisitKey(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
visitor := createTestVisitor(tt.fieldKeys, tt.ignoreNotFoundKeys)
visitor := createTestVisitor(t, tt.fieldKeys, tt.ignoreNotFoundKeys)
keyCtx := parseKeyContext(tt.keyText)
if keyCtx == nil {
@@ -776,7 +781,9 @@ type visitComparisonCase struct {
// visitComparisonOpts builds the two FilterExprVisitorOpts shared by all
// TestVisitComparison_* tests.
func visitComparisonOpts() (rsbOpts, sbOpts FilterExprVisitorOpts) {
func visitComparisonOpts(t *testing.T) (rsbOpts, sbOpts FilterExprVisitorOpts) {
t.Helper()
fl := flaggertest.New(t)
allVariable := map[string]qbtypes.VariableItem{
"service": {
Type: qbtypes.DynamicVariableType,
@@ -790,8 +797,10 @@ func visitComparisonOpts() (rsbOpts, sbOpts FilterExprVisitorOpts) {
FieldDataType: telemetrytypes.FieldDataTypeString,
}
rsbOpts = FilterExprVisitorOpts{
Context: t.Context(),
FieldKeys: visitTestKeys,
ConditionBuilder: &resourceConditionBuilder{},
Flagger: fl,
Variables: allVariable,
SkipResourceFilter: false,
SkipFullTextFilter: true,
@@ -799,8 +808,10 @@ func visitComparisonOpts() (rsbOpts, sbOpts FilterExprVisitorOpts) {
IgnoreNotFoundKeys: true,
}
sbOpts = FilterExprVisitorOpts{
Context: t.Context(),
FieldKeys: visitTestKeys,
ConditionBuilder: &conditionBuilder{},
Flagger: fl,
Variables: allVariable,
SkipResourceFilter: true,
SkipFullTextFilter: false,
@@ -814,7 +825,7 @@ func visitComparisonOpts() (rsbOpts, sbOpts FilterExprVisitorOpts) {
// TestVisitComparison_AND covers AND expressions with attribute keys (a, b, c →
// TrueConditionLiteral in RSB) and resource keys (x, y, z → "{name}_cond" in RSB).
func TestVisitComparison_AND(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
name: "single attribute key",
@@ -892,7 +903,7 @@ func TestVisitComparison_AND(t *testing.T) {
// - NOT inside a comparison (e.g. NOT LIKE, NOT EXISTS): the inner NOT is folded
// into the operator token; conditionBuilder ignores it, so no extra NOT is emitted.
func TestVisitComparison_NOT(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
// Unary NOT on an attribute key: NOT(SkipConditionLiteral) → SkipConditionLiteral (guard).
@@ -985,7 +996,7 @@ func TestVisitComparison_NOT(t *testing.T) {
// SkipResourceFilter to false when an OR token is detected in the expression,
// so resource keys become visible in sbOpts for all cases in this suite.
func TestVisitComparison_OR(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
name: "resource OR resource",
@@ -1086,7 +1097,7 @@ func TestVisitComparison_OR(t *testing.T) {
// TestVisitComparison_Precedence covers AND/OR/NOT operator precedence
// (AND binds tighter than OR; NOT binds tightest).
func TestVisitComparison_Precedence(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
// a→true short-circuits OR.
@@ -1168,7 +1179,7 @@ func TestVisitComparison_Precedence(t *testing.T) {
// VisitPrimary adds one extra layer of parens around real conditions;
// TrueConditionLiteral passes through unwrapped.
func TestVisitComparison_Parens(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
// RSB: SkipConditionLiteral passes through unwrapped. SB: VisitPrimary wraps in parens.
@@ -1271,7 +1282,7 @@ func TestVisitComparison_Parens(t *testing.T) {
// rsbOpts has SkipFullTextFilter=true → TrueConditionLiteral.
// sbOpts has SkipFullTextFilter=false, FullTextColumn=bodyCol → "body_cond".
func TestVisitComparison_FullText(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
name: "standalone full-text term",
@@ -1428,7 +1439,7 @@ func TestVisitComparison_FullText(t *testing.T) {
// Equality with __all__ does NOT short-circuit — the variable resolves to the literal
// "__all__" string and ConditionFor is called normally.
func TestVisitComparison_AllVariable(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
name: "IN allVariable alone",
@@ -1536,7 +1547,7 @@ func TestVisitComparison_AllVariable(t *testing.T) {
// sbOpts has SkipFunctionCalls=false; has/hasAny/hasAll only support FieldContextBody,
// so calls on attribute/resource keys return an error.
func TestVisitComparison_FunctionCalls(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
name: "has on attribute key",
@@ -1615,7 +1626,7 @@ func TestVisitComparison_FunctionCalls(t *testing.T) {
// (no keys resolved); SkipConditionLiteral short-circuits OR and is stripped from AND.
// sbOpts has IgnoreNotFoundKeys=false → key lookup appends an error.
func TestVisitComparison_UnknownKeys(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{
// RSB: unknown_key → SkipConditionLiteral (no keys resolved); stripped from AND; x_cond survives.
@@ -1682,7 +1693,7 @@ func TestVisitComparison_UnknownKeys(t *testing.T) {
// TestVisitComparison_SkippableLiteralValues guards against two distinct collision risks
// involving SkippableConditionLiterals ("true", "__skip__", "__skip_because_of_error__"):.
func TestVisitComparison_SkippableLiteralValues(t *testing.T) {
rsbOpts, sbOpts := visitComparisonOpts()
rsbOpts, sbOpts := visitComparisonOpts(t)
tests := []visitComparisonCase{
{

View File

@@ -23,7 +23,6 @@ import (
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/modules/user"
@@ -115,9 +114,6 @@ type Config struct {
// MetricsExplorer config
MetricsExplorer metricsexplorer.Config `mapstructure:"metricsexplorer"`
// InfraMonitoring config
InfraMonitoring inframonitoring.Config `mapstructure:"inframonitoring"`
// Flagger config
Flagger flagger.Config `mapstructure:"flagger"`
@@ -161,7 +157,6 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
gateway.NewConfigFactory(),
tokenizer.NewConfigFactory(),
metricsexplorer.NewConfigFactory(),
inframonitoring.NewConfigFactory(),
flagger.NewConfigFactory(),
user.NewConfigFactory(),
identn.NewConfigFactory(),

View File

@@ -22,8 +22,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/fields/implfields"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring/implinframonitoring"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/quickfilter"
@@ -57,7 +55,6 @@ type Handlers struct {
SpanPercentile spanpercentile.Handler
Services services.Handler
MetricsExplorer metricsexplorer.Handler
InfraMonitoring inframonitoring.Handler
Global global.Handler
FlaggerHandler flagger.Handler
GatewayHandler gateway.Handler
@@ -98,7 +95,6 @@ func NewHandlers(
RawDataExport: implrawdataexport.NewHandler(modules.RawDataExport),
Services: implservices.NewHandler(modules.Services),
MetricsExplorer: implmetricsexplorer.NewHandler(modules.MetricsExplorer),
InfraMonitoring: implinframonitoring.NewHandler(modules.InfraMonitoring),
SpanPercentile: implspanpercentile.NewHandler(modules.SpanPercentile),
Global: signozglobal.NewHandler(global),
FlaggerHandler: flagger.NewHandler(flaggerService),

View File

@@ -52,7 +52,7 @@ func TestNewHandlers(t *testing.T) {
userRoleStore := impluser.NewUserRoleStore(sqlstore, providerSettings)
userGetter := impluser.NewGetter(impluser.NewStore(sqlstore, providerSettings), userRoleStore, flagger)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, nil, nil)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, nil, nil, flagger)
querierHandler := querier.NewHandler(providerSettings, nil, nil)
registryHandler := factory.NewHandler(nil)

View File

@@ -14,8 +14,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring/implinframonitoring"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -43,6 +41,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/tracefunnel/impltracefunnel"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
@@ -71,7 +70,6 @@ type Modules struct {
Services services.Module
SpanPercentile spanpercentile.Module
MetricsExplorer metricsexplorer.Module
InfraMonitoring inframonitoring.Module
Promote promote.Module
ServiceAccount serviceaccount.Module
CloudIntegration cloudintegration.Module
@@ -99,6 +97,7 @@ func NewModules(
userRoleStore authtypes.UserRoleStore,
serviceAccount serviceaccount.Module,
cloudIntegrationModule cloudintegration.Module,
fl flagger.Flagger,
) Modules {
quickfilter := implquickfilter.NewModule(implquickfilter.NewStore(sqlstore))
orgSetter := implorganization.NewSetter(implorganization.NewStore(sqlstore), alertmanager, quickfilter)
@@ -121,11 +120,10 @@ func NewModules(
Session: implsession.NewModule(providerSettings, authNs, userSetter, userGetter, implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs), tokenizer, orgGetter),
SpanPercentile: implspanpercentile.NewModule(querier, providerSettings),
Services: implservices.NewModule(querier, telemetryStore),
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
InfraMonitoring: implinframonitoring.NewModule(telemetryStore, telemetryMetadataStore, querier, providerSettings, config.InfraMonitoring),
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer, fl),
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
ServiceAccount: serviceAccount,
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger, fl)),
CloudIntegration: cloudIntegrationModule,
}
}

View File

@@ -56,7 +56,7 @@ func TestNewModules(t *testing.T) {
serviceAccount := implserviceaccount.NewModule(implserviceaccount.NewStore(sqlstore), nil, nil, nil, providerSettings, serviceaccount.Config{})
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, serviceAccount, implcloudintegration.NewModule())
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, serviceAccount, implcloudintegration.NewModule(), flagger)
reflectVal := reflect.ValueOf(modules)
for i := 0; i < reflectVal.NumField(); i++ {

View File

@@ -22,7 +22,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/promote"
@@ -62,7 +61,6 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ dashboard.Module }{},
struct{ dashboard.Handler }{},
struct{ metricsexplorer.Handler }{},
struct{ inframonitoring.Handler }{},
struct{ gateway.Handler }{},
struct{ fields.Handler }{},
struct{ authz.Handler }{},

View File

@@ -272,7 +272,6 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
modules.Dashboard,
handlers.Dashboard,
handlers.MetricsExplorer,
handlers.InfraMonitoring,
handlers.GatewayHandler,
handlers.Fields,
handlers.AuthzHandler,

View File

@@ -410,6 +410,7 @@ func New(
telemetrymetadata.DBName,
telemetrymetadata.AttributesMetadataLocalTableName,
telemetrymetadata.ColumnEvolutionMetadataTableName,
flagger,
)
global, err := factory.NewProviderFromNamedMap(
@@ -431,7 +432,7 @@ func New(
}
// Initialize all modules
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache, queryParser, config, dashboard, userGetter, userRoleStore, serviceAccount, cloudIntegrationModule)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache, queryParser, config, dashboard, userGetter, userRoleStore, serviceAccount, cloudIntegrationModule, flagger)
// Initialize ruler from the variant-specific provider factories
rulerInstance, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.Ruler, rulerProviderFactories(cache, alertmanager, sqlstore, telemetrystore, telemetryMetadataStore, prometheus, orgGetter, modules.RuleStateHistory, querier, queryParser), "signoz")

View File

@@ -8,6 +8,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryresourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -24,6 +25,7 @@ type auditQueryStatementBuilder struct {
aggExprRewriter qbtypes.AggExprRewriter
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
fl flagger.Flagger
}
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*auditQueryStatementBuilder)(nil)
@@ -36,6 +38,7 @@ func NewAuditQueryStatementBuilder(
aggExprRewriter qbtypes.AggExprRewriter,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
fl flagger.Flagger,
) *auditQueryStatementBuilder {
auditSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetryaudit")
@@ -48,6 +51,7 @@ func NewAuditQueryStatementBuilder(
metadataStore,
fullTextColumn,
jsonKeyToKey,
fl,
)
return &auditQueryStatementBuilder{
@@ -59,6 +63,7 @@ func NewAuditQueryStatementBuilder(
aggExprRewriter: aggExprRewriter,
fullTextColumn: fullTextColumn,
jsonKeyToKey: jsonKeyToKey,
fl: fl,
}
}
@@ -319,7 +324,7 @@ func (b *auditQueryStatementBuilder) buildTimeSeriesQuery(
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey, b.fl)
if err != nil {
return nil, err
}
@@ -456,7 +461,7 @@ func (b *auditQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey, b.fl)
if err != nil {
return nil, err
}
@@ -552,6 +557,7 @@ func (b *auditQueryStatementBuilder) addFilterCondition(
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
Flagger: b.fl,
SkipResourceFilter: true,
FullTextColumn: b.fullTextColumn,
JsonKeyToKey: b.jsonKeyToKey,

View File

@@ -10,6 +10,7 @@ import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/stretchr/testify/require"
)
@@ -46,13 +47,15 @@ func auditFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
}
}
func newTestAuditStatementBuilder() *auditQueryStatementBuilder {
func newTestAuditStatementBuilder(t *testing.T) *auditQueryStatementBuilder {
t.Helper()
fl := flaggertest.New(t)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = auditFieldKeyMap()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
return NewAuditQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -62,11 +65,12 @@ func newTestAuditStatementBuilder() *auditQueryStatementBuilder {
aggExprRewriter,
DefaultFullTextColumn,
nil,
fl,
)
}
func TestStatementBuilder(t *testing.T) {
statementBuilder := newTestAuditStatementBuilder()
statementBuilder := newTestAuditStatementBuilder(t)
ctx := context.Background()
testCases := []struct {

View File

@@ -6,6 +6,7 @@ import (
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -15,10 +16,11 @@ import (
type conditionBuilder struct {
fm qbtypes.FieldMapper
fl flagger.Flagger
}
func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
return &conditionBuilder{fm: fm}
func NewConditionBuilder(fm qbtypes.FieldMapper, fl flagger.Flagger) *conditionBuilder {
return &conditionBuilder{fm: fm, fl: fl}
}
func (c *conditionBuilder) conditionFor(
@@ -36,7 +38,7 @@ func (c *conditionBuilder) conditionFor(
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
for _, column := range columns {
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && key.Name != messageSubField {
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.IsBodyJSONEnabled(ctx, c.fl) && key.Name != messageSubField {
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
if err != nil {
@@ -56,7 +58,7 @@ func (c *conditionBuilder) conditionFor(
}
// Check if this is a body JSON search - either by FieldContext
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.IsBodyJSONEnabled(ctx, c.fl) {
fieldExpression, value = GetBodyJSONKey(ctx, key, operator, value)
}
@@ -167,7 +169,7 @@ func (c *conditionBuilder) conditionFor(
// in the UI based query builder, `exists` and `not exists` are used for
// key membership checks, so depending on the column type, the condition changes
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.IsBodyJSONEnabled(ctx, c.fl) {
if operator == qbtypes.FilterOperatorExists {
return GetBodyJSONKeyForExists(ctx, key, operator, value), nil
} else {
@@ -287,7 +289,7 @@ func (c *conditionBuilder) ConditionFor(
case telemetrytypes.FieldContextBody:
// Querying JSON fields already account for Nullability of fields
// so additional exists checks are not needed
if querybuilder.BodyJSONQueryEnabled {
if querybuilder.IsBodyJSONEnabled(ctx, c.fl) {
return condition, nil
}
}

View File

@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -122,8 +123,9 @@ func TestExistsConditionForWithEvolutions(t *testing.T) {
expectedError: nil,
},
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
conditionBuilder := NewConditionBuilder(fm, fl)
ctx := context.Background()
for _, tc := range testCases {
@@ -513,8 +515,9 @@ func TestConditionFor(t *testing.T) {
expectedError: qbtypes.ErrColumnNotFound,
},
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
conditionBuilder := NewConditionBuilder(fm, fl)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
t.Run(tc.name, func(t *testing.T) {
@@ -566,8 +569,9 @@ func TestConditionForMultipleKeys(t *testing.T) {
},
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
conditionBuilder := NewConditionBuilder(fm, fl)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
@@ -825,8 +829,9 @@ func TestConditionForJSONBodySearch(t *testing.T) {
},
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
conditionBuilder := NewConditionBuilder(fm, fl)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()

View File

@@ -1,9 +1,11 @@
package telemetrylogs
import (
"context"
"fmt"
"github.com/SigNoz/signoz-otel-collector/constants"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -41,7 +43,7 @@ const (
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
// messageSubColumn is the ClickHouse sub-column that body searches map to
// when BodyJSONQueryEnabled is true.
// when body_json_enabled feature flag is true.
messageSubField = "message"
messageSubColumn = "body_v2.message"
bodySearchDefaultWarning = "body searches default to `body.message:string`. Use `body.<key>` to search a different field inside body"
@@ -128,8 +130,8 @@ var (
}
)
func bodyAliasExpression() string {
if !querybuilder.BodyJSONQueryEnabled {
func bodyAliasExpression(ctx context.Context, fl flagger.Flagger) string {
if !querybuilder.IsBodyJSONEnabled(ctx, fl) {
return LogsV2BodyColumn
}

View File

@@ -12,6 +12,7 @@ import (
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/utils"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -66,13 +67,15 @@ var (
}
)
type fieldMapper struct{}
func NewFieldMapper() qbtypes.FieldMapper {
return &fieldMapper{}
type fieldMapper struct {
fl flagger.Flagger
}
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
func NewFieldMapper(fl flagger.Flagger) qbtypes.FieldMapper {
return &fieldMapper{fl: fl}
}
func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
columns := []*schema.Column{logsV2Columns["resources_string"], logsV2Columns["resource"]}
@@ -96,7 +99,7 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
}
case telemetrytypes.FieldContextBody:
// Body context is for JSON body fields. Use body_v2 if feature flag is enabled.
if querybuilder.BodyJSONQueryEnabled {
if querybuilder.IsBodyJSONEnabled(ctx, m.fl) {
if key.Name == messageSubField {
return []*schema.Column{logsV2Columns[messageSubColumn]}, nil
}
@@ -105,7 +108,7 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
// Fall back to legacy body column
return []*schema.Column{logsV2Columns["body"]}, nil
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
if key.Name == LogsV2BodyColumn && querybuilder.BodyJSONQueryEnabled {
if key.Name == LogsV2BodyColumn && querybuilder.IsBodyJSONEnabled(ctx, m.fl) {
return []*schema.Column{logsV2Columns[messageSubColumn]}, nil
}
col, ok := logsV2Columns[key.Name]
@@ -113,7 +116,7 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
// check if the key has body JSON search
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
// Use body_v2 if feature flag is enabled and we have a body condition builder
if querybuilder.BodyJSONQueryEnabled {
if querybuilder.IsBodyJSONEnabled(ctx, m.fl) {
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
// i.e return both the body json and body json promoted and let the evolutions decide which one to use
// based on the query range time.

View File

@@ -6,6 +6,7 @@ import (
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/assert"
@@ -165,7 +166,8 @@ func TestGetColumn(t *testing.T) {
},
}
fm := NewFieldMapper()
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -273,7 +275,8 @@ func TestGetFieldKeyName(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
result, err := fm.FieldFor(ctx, 0, 0, &tc.key)
if tc.expectedError != nil {
@@ -514,7 +517,8 @@ func TestFieldForWithEvolutions(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
tsStart := uint64(tc.tsStartTime.UnixNano())
tsEnd := uint64(tc.tsEndTime.UnixNano())
@@ -963,7 +967,8 @@ func TestFieldForWithMaterialized(t *testing.T) {
},
}
fm := NewFieldMapper()
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {

View File

@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/stretchr/testify/require"
@@ -12,9 +13,10 @@ import (
// TestLikeAndILikeWithoutWildcards_Warns Tests that LIKE/ILIKE without wildcards add warnings and include docs URL.
func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
fl := flaggertest.New(t)
ctx := context.Background()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
keys := buildCompleteFieldKeyMap(releaseTime)
@@ -27,6 +29,7 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
FieldKeys: keys,
FullTextColumn: DefaultFullTextColumn,
JsonKeyToKey: GetBodyJSONKey,
Flagger: fl,
}
tests := []string{
@@ -51,8 +54,9 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
// TestLikeAndILikeWithWildcards_NoWarn Tests that LIKE/ILIKE with wildcards do not add warnings.
func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
keys := buildCompleteFieldKeyMap(releaseTime)
@@ -65,6 +69,7 @@ func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
FieldKeys: keys,
FullTextColumn: DefaultFullTextColumn,
JsonKeyToKey: GetBodyJSONKey,
Flagger: fl,
}
tests := []string{

View File

@@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -15,8 +16,9 @@ import (
// TestFilterExprLogsBodyJSON tests a comprehensive set of query patterns for body JSON search.
func TestFilterExprLogsBodyJSON(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
// Define a comprehensive set of field keys to support all test cases
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
keys := buildCompleteFieldKeyMap(releaseTime)
@@ -27,10 +29,9 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
FieldMapper: fm,
ConditionBuilder: cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{
Name: "body",
},
JsonKeyToKey: GetBodyJSONKey,
Flagger: fl,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "body"},
JsonKeyToKey: GetBodyJSONKey,
}
testCases := []struct {

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -17,10 +18,11 @@ import (
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search.
func TestFilterExprLogs(t *testing.T) {
fl := flaggertest.New(t)
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
ctx := context.Background()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
// Define a comprehensive set of field keys to support all test cases
keys := buildCompleteFieldKeyMap(releaseTime)
@@ -35,6 +37,7 @@ func TestFilterExprLogs(t *testing.T) {
JsonKeyToKey: GetBodyJSONKey,
StartNs: uint64(releaseTime.Add(-5 * time.Minute).UnixNano()),
EndNs: uint64(releaseTime.Add(5 * time.Minute).UnixNano()),
Flagger: fl,
}
testCases := []struct {
@@ -2429,8 +2432,9 @@ func TestFilterExprLogs(t *testing.T) {
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search.
func TestFilterExprLogsConflictNegation(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
// Define a comprehensive set of field keys to support all test cases
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
@@ -2457,6 +2461,7 @@ func TestFilterExprLogsConflictNegation(t *testing.T) {
FieldKeys: keys,
FullTextColumn: DefaultFullTextColumn,
JsonKeyToKey: GetBodyJSONKey,
Flagger: fl,
}
testCases := []struct {

View File

@@ -8,6 +8,7 @@ import (
"time"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -32,9 +33,6 @@ func (t TestExpected) GetQuery() string {
}
func TestJSONStmtBuilder_TimeSeries(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t, false)
cases := []struct {
@@ -115,9 +113,6 @@ func TestJSONStmtBuilder_TimeSeries(t *testing.T) {
not a body_promoted.* column. These tests assumed the old coalesce(body_promoted.x, body_v2.x) path.
func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t, "user.age", "user.name")
cases := []struct {
@@ -176,10 +171,6 @@ func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
*/
func TestJSONStmtBuilder_PrimitivePaths(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t, false)
cases := []struct {
name string
@@ -340,10 +331,6 @@ func TestJSONStmtBuilder_PrimitivePaths(t *testing.T) {
(direct sub-column access), not a body_promoted.* column.
func TestStatementBuilderListQueryBodyPromoted(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t, "education", "tags")
cases := []struct {
name string
@@ -507,10 +494,6 @@ func TestStatementBuilderListQueryBodyPromoted(t *testing.T) {
*/
func TestJSONStmtBuilder_ArrayPaths(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t, false)
cases := []struct {
name string
@@ -816,10 +799,6 @@ func TestJSONStmtBuilder_ArrayPaths(t *testing.T) {
}
func TestJSONStmtBuilder_IndexedPaths(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t, true)
cases := []struct {
name string
@@ -939,9 +918,6 @@ func TestJSONStmtBuilder_IndexedPaths(t *testing.T) {
}
func TestJSONStmtBuilder_SelectField(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t, false)
cases := []struct {
@@ -1030,9 +1006,6 @@ func TestJSONStmtBuilder_SelectField(t *testing.T) {
}
func TestJSONStmtBuilder_OrderBy(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t, false)
cases := []struct {
@@ -1148,11 +1121,14 @@ func buildTestTelemetryMetadataStore(t *testing.T, addIndexes bool) *telemetryty
}
func buildJSONTestStatementBuilder(t *testing.T, addIndexes bool) *logQueryStatementBuilder {
mockMetadataStore := buildTestTelemetryMetadataStore(t, addIndexes)
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
t.Helper()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
mockMetadataStore := buildTestTelemetryMetadataStore(t, addIndexes)
fl := flaggertest.WithBodyJSON(t, true)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -1162,18 +1138,8 @@ func buildJSONTestStatementBuilder(t *testing.T, addIndexes bool) *logQueryState
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
return statementBuilder
}
func jsonQueryTestUtil(_ *testing.T) (func(), func()) {
enable := func() {
querybuilder.BodyJSONQueryEnabled = true
}
disable := func() {
querybuilder.BodyJSONQueryEnabled = false
}
return enable, disable
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryresourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -22,6 +23,7 @@ type logQueryStatementBuilder struct {
cb qbtypes.ConditionBuilder
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
aggExprRewriter qbtypes.AggExprRewriter
fl flagger.Flagger
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
@@ -37,6 +39,7 @@ func NewLogQueryStatementBuilder(
aggExprRewriter qbtypes.AggExprRewriter,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
fl flagger.Flagger,
) *logQueryStatementBuilder {
logsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrylogs")
@@ -49,6 +52,7 @@ func NewLogQueryStatementBuilder(
metadataStore,
fullTextColumn,
jsonKeyToKey,
fl,
)
return &logQueryStatementBuilder{
@@ -58,6 +62,7 @@ func NewLogQueryStatementBuilder(
cb: conditionBuilder,
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
aggExprRewriter: aggExprRewriter,
fl: fl,
fullTextColumn: fullTextColumn,
jsonKeyToKey: jsonKeyToKey,
}
@@ -76,7 +81,7 @@ func (b *logQueryStatementBuilder) Build(
start = querybuilder.ToNanoSecs(start)
end = querybuilder.ToNanoSecs(end)
keySelectors, warnings := getKeySelectors(query)
keySelectors, warnings := getKeySelectors(ctx, query, b.fl)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
@@ -107,7 +112,7 @@ func (b *logQueryStatementBuilder) Build(
return stmt, nil
}
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([]*telemetrytypes.FieldKeySelector, []string) {
func getKeySelectors(ctx context.Context, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], flagger flagger.Flagger) ([]*telemetrytypes.FieldKeySelector, []string) {
var keySelectors []*telemetrytypes.FieldKeySelector
var warnings []string
@@ -159,7 +164,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([
// When the new JSON body experience is enabled, warn the user if they use the bare
// "body" key in the filter — queries on plain "body" default to body.message:string.
// TODO(Piyush): Setup better for coming FTS support.
if querybuilder.BodyJSONQueryEnabled {
if querybuilder.IsBodyJSONEnabled(ctx, flagger) {
for _, sel := range keySelectors {
if sel.Name == LogsV2BodyColumn {
warnings = append(warnings, bodySearchDefaultWarning)
@@ -279,7 +284,7 @@ func (b *logQueryStatementBuilder) buildListQuery(
sb.SelectMore(LogsV2SeverityNumberColumn)
sb.SelectMore(LogsV2ScopeNameColumn)
sb.SelectMore(LogsV2ScopeVersionColumn)
sb.SelectMore(bodyAliasExpression())
sb.SelectMore(bodyAliasExpression(ctx, b.fl))
sb.SelectMore(LogsV2AttributesStringColumn)
sb.SelectMore(LogsV2AttributesNumberColumn)
sb.SelectMore(LogsV2AttributesBoolColumn)
@@ -379,7 +384,7 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey, b.fl)
if err != nil {
return nil, err
}
@@ -532,7 +537,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey, b.fl)
if err != nil {
return nil, err
}
@@ -644,6 +649,7 @@ func (b *logQueryStatementBuilder) addFilterCondition(
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
Flagger: b.fl,
SkipResourceFilter: true,
FullTextColumn: b.fullTextColumn,
JsonKeyToKey: b.jsonKeyToKey,

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -15,7 +16,6 @@ import (
)
func TestStatementBuilderTimeSeries(t *testing.T) {
// Create a test release time
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
releaseTimeNano := uint64(releaseTime.UnixNano())
@@ -191,16 +191,17 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
}
ctx := context.Background()
fl := flaggertest.New(t)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
keysMap := buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap = keysMap
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -210,6 +211,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
for _, c := range cases {
@@ -313,15 +315,16 @@ func TestStatementBuilderListQuery(t *testing.T) {
}
ctx := context.Background()
fl := flaggertest.New(t)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
fm := NewFieldMapper()
fm := NewFieldMapper(fl)
// Create a test release time
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, fl)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -331,6 +334,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
for _, c := range cases {
@@ -454,14 +458,15 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
}
ctx := context.Background()
fl := flaggertest.New(t)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
fm := NewFieldMapper()
fm := NewFieldMapper(fl)
// Create a test release time
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, fl)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -471,6 +476,7 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
for _, c := range cases {
@@ -528,14 +534,15 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
}
ctx := context.Background()
fl := flaggertest.New(t)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
fm := NewFieldMapper()
fm := NewFieldMapper(fl)
// Create a test release time
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, fl)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -545,6 +552,7 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
for _, c := range cases {
@@ -624,11 +632,12 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
ctx := context.Background()
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
fm := NewFieldMapper()
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, fl)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -638,6 +647,7 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
for _, c := range cases {
@@ -845,12 +855,13 @@ func TestAdjustKey(t *testing.T) {
},
}
fm := NewFieldMapper()
fl := flaggertest.New(t)
fm := NewFieldMapper(fl)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, fl)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -860,6 +871,7 @@ func TestAdjustKey(t *testing.T) {
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
for _, c := range cases {
@@ -984,25 +996,18 @@ func TestStmtBuilderBodyField(t *testing.T) {
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
enable, disable := jsonQueryTestUtil(t)
defer disable()
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if c.enableBodyJSONQuery {
enable()
} else {
disable()
}
// build the key map after enabling/disabling body JSON query
fl := flaggertest.WithBodyJSON(t, c.enableBodyJSONQuery)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
// build the key map
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
for _, field := range IntrinsicFields {
f := field
mockMetadataStore.KeysMap[field.Name] = append(mockMetadataStore.KeysMap[field.Name], &f)
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
@@ -1011,6 +1016,7 @@ func TestStmtBuilderBodyField(t *testing.T) {
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
@@ -1072,25 +1078,18 @@ func TestStmtBuilderBodyFullTextSearch(t *testing.T) {
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
enable, disable := jsonQueryTestUtil(t)
defer disable()
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if c.enableBodyJSONQuery {
enable()
} else {
disable()
}
// build the key map after enabling/disabling body JSON query
fl := flaggertest.WithBodyJSON(t, c.enableBodyJSONQuery)
fm := NewFieldMapper(fl)
cb := NewConditionBuilder(fm, fl)
// build the key map
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
for _, field := range IntrinsicFields {
f := field
mockMetadataStore.KeysMap[field.Name] = append(mockMetadataStore.KeysMap[field.Name], &f)
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
@@ -1099,6 +1098,7 @@ func TestStmtBuilderBodyFullTextSearch(t *testing.T) {
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
fl,
)
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryaudit"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
@@ -63,6 +64,7 @@ type telemetryMetaStore struct {
fm qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
fl flagger.Flagger
jsonColumnMetadata map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata
}
@@ -94,8 +96,12 @@ func NewTelemetryMetaStore(
relatedMetadataDBName string,
relatedMetadataTblName string,
columnEvolutionMetadataTblName string,
fl flagger.Flagger,
) telemetrytypes.MetadataStore {
metadataSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata")
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
t := &telemetryMetaStore{
logger: metadataSettings.Logger(),
@@ -129,14 +135,11 @@ func NewTelemetryMetaStore(
},
},
},
fl: fl,
fm: fm,
conditionBuilder: conditionBuilder,
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
t.fm = fm
t.conditionBuilder = conditionBuilder
return t
}
@@ -416,7 +419,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
}
// body keys are gated behind the feature flag
queryBodyTable = queryBodyTable && querybuilder.BodyJSONQueryEnabled
queryBodyTable = queryBodyTable && querybuilder.IsBodyJSONEnabled(ctx, t.fl)
// requestedFieldKeySelectors is the set of names the user explicitly asked for.
// Used to ensure a name that is both a parent path AND a directly requested field still surfaces
@@ -676,7 +679,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
}
// enrich body keys with promoted paths, indexes, and JSON access plans
if querybuilder.BodyJSONQueryEnabled {
if querybuilder.IsBodyJSONEnabled(ctx, t.fl) {
if err := t.enrichJSONKeys(ctx, fieldKeySelectors, keys, parentTypes); err != nil {
return nil, false, err
}
@@ -1409,6 +1412,7 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
FieldMapper: t.fm,
ConditionBuilder: t.conditionBuilder,
FieldKeys: keys,
Flagger: t.fl,
})
if err != nil {
t.logger.WarnContext(ctx, "error parsing existing query for related values", errors.Attr(err))

View File

@@ -15,6 +15,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/assert"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/stretchr/testify/require"
)
@@ -46,6 +47,7 @@ func TestGetFirstSeenFromMetricMetadata(t *testing.T) {
DBName,
AttributesMetadataLocalTableName,
ColumnEvolutionMetadataTableName,
flaggertest.New(t),
)
lookupKeys := []telemetrytypes.MetricMetadataLookupKey{

View File

@@ -17,10 +17,12 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/assert"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/stretchr/testify/require"
)
func newTestTelemetryMetaStoreTestHelper(store telemetrystore.TelemetryStore) telemetrytypes.MetadataStore {
func newTestTelemetryMetaStoreTestHelper(t *testing.T, store telemetrystore.TelemetryStore) telemetrytypes.MetadataStore {
t.Helper()
return NewTelemetryMetaStore(
instrumentationtest.New().ToProviderSettings(),
store,
@@ -45,6 +47,7 @@ func newTestTelemetryMetaStoreTestHelper(store telemetrystore.TelemetryStore) te
DBName,
AttributesMetadataLocalTableName,
ColumnEvolutionMetadataTableName,
flaggertest.New(t),
)
}
@@ -66,7 +69,7 @@ func TestGetKeys(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
rows := cmock.NewRows([]cmock.ColumnType{
{Name: "statement", Type: "String"},
@@ -176,7 +179,7 @@ func TestApplyBackwardCompatibleKeys(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
hasTraces := false
hasLogs := false
@@ -340,7 +343,7 @@ func TestGetMetricFieldValuesIntrinsicMetricName(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
valueRows := cmock.NewRows([]cmock.ColumnType{
{Name: "metric_name", Type: "String"},
@@ -379,7 +382,7 @@ func TestGetMetricFieldValuesIntrinsicBoolReturnsEmpty(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
metadataRows := cmock.NewRows([]cmock.ColumnType{
{Name: "attr_string_value", Type: "String"},
@@ -411,7 +414,7 @@ func TestGetMetricFieldValuesAppliesMetricNamespace(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
valueRows := cmock.NewRows([]cmock.ColumnType{
{Name: "attr_string_value", Type: "String"},
@@ -443,7 +446,7 @@ func TestGetMetricFieldValuesIntrinsicMetricNameAppliesMetricNamespace(t *testin
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
valueRows := cmock.NewRows([]cmock.ColumnType{
{Name: "metric_name", Type: "String"},
@@ -483,7 +486,7 @@ func TestGetMeterSourceMetricFieldValuesAppliesMetricNamespace(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
rows := cmock.NewRows([]cmock.ColumnType{
{Name: "attr", Type: "Array(String)"},
@@ -514,7 +517,7 @@ func TestGetMetricsKeysAppliesMetricNamespace(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
rows := cmock.NewRows([]cmock.ColumnType{
{Name: "name", Type: "String"},
@@ -549,7 +552,7 @@ func TestGetMeterSourceMetricKeysAppliesMetricNamespace(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata := newTestTelemetryMetaStoreTestHelper(mockTelemetryStore)
metadata := newTestTelemetryMetaStoreTestHelper(t, mockTelemetryStore)
rows := cmock.NewRows([]cmock.ColumnType{
{Name: "attr_name", Type: "String"},

View File

@@ -7,6 +7,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
@@ -21,6 +22,7 @@ type meterQueryStatementBuilder struct {
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
metricsStatementBuilder *telemetrymetrics.MetricQueryStatementBuilder
fl flagger.Flagger
}
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*meterQueryStatementBuilder)(nil)
@@ -31,6 +33,7 @@ func NewMeterQueryStatementBuilder(
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
metricsStatementBuilder *telemetrymetrics.MetricQueryStatementBuilder,
fl flagger.Flagger,
) *meterQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymeter")
@@ -40,6 +43,7 @@ func NewMeterQueryStatementBuilder(
fm: fieldMapper,
cb: conditionBuilder,
metricsStatementBuilder: metricsStatementBuilder,
fl: fl,
}
}
@@ -152,6 +156,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
Flagger: b.fl,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
StartNs: start,
@@ -241,6 +246,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
Flagger: b.fl,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
StartNs: start,
@@ -311,6 +317,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
Flagger: b.fl,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
StartNs: start,

View File

@@ -5,7 +5,7 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
@@ -166,10 +166,7 @@ func TestStatementBuilder(t *testing.T) {
}
mockMetadataStore.KeysMap = keys
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
if err != nil {
t.Fatalf("failed to create flagger: %v", err)
}
flagger := flaggertest.New(t)
metricStmtBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(instrumentationtest.New().ToProviderSettings(), mockMetadataStore, fm, cb, flagger)
@@ -179,6 +176,7 @@ func TestStatementBuilder(t *testing.T) {
fm,
cb,
metricStmtBuilder,
flagger,
)
for _, c := range cases {

View File

@@ -274,6 +274,7 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
Flagger: b.flagger,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
StartNs: start,

View File

@@ -6,6 +6,7 @@ import (
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -22,6 +23,7 @@ type resourceFilterStatementBuilder[T any] struct {
metadataStore telemetrytypes.MetadataStore
signal telemetrytypes.Signal
source telemetrytypes.Source
flagger flagger.Flagger
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
@@ -42,6 +44,7 @@ func New[T any](
metadataStore telemetrytypes.MetadataStore,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
fl flagger.Flagger,
) *resourceFilterStatementBuilder[T] {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetryresourcefilter")
fm := NewFieldMapper()
@@ -55,6 +58,7 @@ func New[T any](
metadataStore: metadataStore,
signal: signal,
source: source,
flagger: fl,
fullTextColumn: fullTextColumn,
jsonKeyToKey: jsonKeyToKey,
}
@@ -138,6 +142,7 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
FieldMapper: b.fieldMapper,
ConditionBuilder: b.conditionBuilder,
FieldKeys: keys,
Flagger: b.flagger,
FullTextColumn: b.fullTextColumn,
JsonKeyToKey: b.jsonKeyToKey,
SkipFullTextFilter: true,

View File

@@ -4,6 +4,7 @@ import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -361,6 +362,7 @@ func TestResourceFilterStatementBuilder_Traces(t *testing.T) {
mockMetadataStore,
nil,
nil,
flaggertest.New(t),
)
for _, c := range cases {
@@ -554,6 +556,7 @@ func TestResourceFilterStatementBuilder_Logs(t *testing.T) {
mockMetadataStore,
nil,
nil,
flaggertest.New(t),
)
for _, c := range cases {
@@ -621,6 +624,7 @@ func TestResourceFilterStatementBuilder_Variables(t *testing.T) {
mockMetadataStore,
nil,
nil,
flaggertest.New(t),
)
for _, c := range cases {

View File

@@ -8,6 +8,7 @@ import (
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -85,6 +86,7 @@ func TestSpanScopeFilterExpression(t *testing.T) {
Builder: sb,
StartNs: tt.startNs,
EndNs: 1761458708000000000,
Flagger: flaggertest.New(t),
})
if tt.expectError {
@@ -155,6 +157,7 @@ func TestSpanScopeWithResourceFilter(t *testing.T) {
SkipResourceFilter: false, // This would be set by the statement builder
StartNs: 1761437108000000000,
EndNs: 1761458708000000000,
Flagger: flaggertest.New(t),
})
assert.NoError(t, err)

View File

@@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryresourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
@@ -30,6 +31,7 @@ type traceQueryStatementBuilder struct {
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
aggExprRewriter qbtypes.AggExprRewriter
telemetryStore telemetrystore.TelemetryStore
fl flagger.Flagger
}
var _ qbtypes.StatementBuilder[qbtypes.TraceAggregation] = (*traceQueryStatementBuilder)(nil)
@@ -41,6 +43,7 @@ func NewTraceQueryStatementBuilder(
conditionBuilder qbtypes.ConditionBuilder,
aggExprRewriter qbtypes.AggExprRewriter,
telemetryStore telemetrystore.TelemetryStore,
fl flagger.Flagger,
) *traceQueryStatementBuilder {
tracesSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrytraces")
@@ -53,6 +56,7 @@ func NewTraceQueryStatementBuilder(
metadataStore,
nil,
nil,
fl,
)
return &traceQueryStatementBuilder{
@@ -63,6 +67,7 @@ func NewTraceQueryStatementBuilder(
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
aggExprRewriter: aggExprRewriter,
telemetryStore: telemetryStore,
fl: fl,
}
}
@@ -510,7 +515,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil, b.fl)
if err != nil {
return nil, err
}
@@ -658,7 +663,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil, b.fl)
if err != nil {
return nil, err
}
@@ -769,6 +774,7 @@ func (b *traceQueryStatementBuilder) addFilterCondition(
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
Flagger: b.fl,
SkipResourceFilter: true,
Variables: variables,
StartNs: start,

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -355,7 +356,8 @@ func TestStatementBuilder(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -364,6 +366,7 @@ func TestStatementBuilder(t *testing.T) {
cb,
aggExprRewriter,
nil,
fl,
)
vars := map[string]qbtypes.VariableItem{
@@ -648,7 +651,8 @@ func TestStatementBuilderListQuery(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -657,6 +661,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
cb,
aggExprRewriter,
nil,
fl,
)
for _, c := range cases {
@@ -755,7 +760,8 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
if mockMetadataStore.KeysMap == nil {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -764,6 +770,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
cb,
aggExprRewriter,
nil,
fl,
)
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
@@ -905,7 +912,8 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -914,6 +922,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
cb,
aggExprRewriter,
nil,
fl,
)
for _, c := range cases {
@@ -1119,7 +1128,8 @@ func TestAdjustKey(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
@@ -1127,6 +1137,7 @@ func TestAdjustKey(t *testing.T) {
cb,
aggExprRewriter,
nil,
fl,
)
for _, c := range cases {
@@ -1391,7 +1402,8 @@ func TestAdjustKeys(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
@@ -1399,6 +1411,7 @@ func TestAdjustKeys(t *testing.T) {
cb,
aggExprRewriter,
nil,
fl,
)
for _, c := range cases {

View File

@@ -239,6 +239,7 @@ func (b *traceOperatorCTEBuilder) buildQueryCTE(ctx context.Context, queryName s
FieldMapper: b.stmtBuilder.fm,
ConditionBuilder: b.stmtBuilder.cb,
FieldKeys: keys,
Flagger: b.stmtBuilder.fl,
SkipResourceFilter: true,
StartNs: b.start,
EndNs: b.end,
@@ -560,6 +561,7 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
keys,
telemetrytypes.FieldDataTypeString,
nil,
b.stmtBuilder.fl,
)
if err != nil {
return nil, errors.NewInvalidInputf(
@@ -676,6 +678,7 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
keys,
telemetrytypes.FieldDataTypeString,
nil,
b.stmtBuilder.fl,
)
if err != nil {
return nil, errors.NewInvalidInputf(
@@ -822,6 +825,7 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
keys,
telemetrytypes.FieldDataTypeString,
nil,
b.stmtBuilder.fl,
)
if err != nil {
return nil, errors.NewInvalidInputf(

View File

@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -390,7 +391,8 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
traceStmtBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -399,6 +401,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
cb,
aggExprRewriter,
nil,
fl,
)
statementBuilder := NewTraceOperatorStatementBuilder(
@@ -408,6 +411,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
cb,
traceStmtBuilder,
aggExprRewriter,
fl,
)
for _, c := range cases {
@@ -503,7 +507,8 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
traceStmtBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -512,6 +517,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
cb,
aggExprRewriter,
nil,
fl,
)
statementBuilder := NewTraceOperatorStatementBuilder(
@@ -521,6 +527,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
cb,
traceStmtBuilder,
aggExprRewriter,
fl,
)
for _, c := range cases {

View File

@@ -6,6 +6,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryresourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -20,6 +21,7 @@ type traceOperatorStatementBuilder struct {
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
aggExprRewriter qbtypes.AggExprRewriter
fl flagger.Flagger
}
var _ qbtypes.TraceOperatorStatementBuilder = (*traceOperatorStatementBuilder)(nil)
@@ -31,6 +33,7 @@ func NewTraceOperatorStatementBuilder(
conditionBuilder qbtypes.ConditionBuilder,
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
fl flagger.Flagger,
) *traceOperatorStatementBuilder {
tracesSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrytraces")
@@ -43,6 +46,7 @@ func NewTraceOperatorStatementBuilder(
metadataStore,
nil,
nil,
fl,
)
return &traceOperatorStatementBuilder{
@@ -53,6 +57,7 @@ func NewTraceOperatorStatementBuilder(
traceStmtBuilder: traceStmtBuilder,
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
aggExprRewriter: aggExprRewriter,
fl: fl,
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/assert"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/stretchr/testify/require"
)
@@ -34,7 +35,8 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
Signal: telemetrytypes.SignalTraces,
}}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
@@ -43,6 +45,7 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
cb,
aggExprRewriter,
nil, // telemetryStore is nil - optimization won't happen but code path is tested
fl,
)
tests := []struct {

View File

@@ -1,21 +0,0 @@
package inframonitoringtypes
import (
"github.com/SigNoz/signoz/pkg/valuer"
)
type ResponseType struct {
valuer.String
}
var (
ResponseTypeList = ResponseType{valuer.NewString("list")}
ResponseTypeGroupedList = ResponseType{valuer.NewString("grouped_list")}
)
func (ResponseType) Enum() []any {
return []any{
ResponseTypeList,
ResponseTypeGroupedList,
}
}

View File

@@ -1,117 +0,0 @@
package inframonitoringtypes
import (
"encoding/json"
"slices"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type Hosts struct {
Type ResponseType `json:"type" required:"true"`
Records []HostRecord `json:"records" required:"true"`
Total int `json:"total" required:"true"`
RequiredMetricsCheck RequiredMetricsCheck `json:"requiredMetricsCheck" required:"true"`
EndTimeBeforeRetention bool `json:"endTimeBeforeRetention" required:"true"`
Warning *qbtypes.QueryWarnData `json:"warning,omitempty"`
}
type HostRecord struct {
HostName string `json:"hostName" required:"true"`
Status HostStatus `json:"status" required:"true"`
ActiveHostCount int `json:"activeHostCount" required:"true"`
InactiveHostCount int `json:"inactiveHostCount" required:"true"`
CPU float64 `json:"cpu" required:"true"`
Memory float64 `json:"memory" required:"true"`
Wait float64 `json:"wait" required:"true"`
Load15 float64 `json:"load15" required:"true"`
DiskUsage float64 `json:"diskUsage" required:"true"`
Meta map[string]interface{} `json:"meta" required:"true"`
}
type RequiredMetricsCheck struct {
MissingMetrics []string `json:"missingMetrics" required:"true"`
}
type PostableHosts struct {
Start int64 `json:"start" required:"true"`
End int64 `json:"end" required:"true"`
Filter *HostFilter `json:"filter"`
GroupBy []qbtypes.GroupByKey `json:"groupBy"`
OrderBy *qbtypes.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit" required:"true"`
}
type HostFilter struct {
qbtypes.Filter `json:",inline"`
FilterByStatus HostStatus `json:"filterByStatus"`
}
// Validate ensures HostsListRequest contains acceptable values.
func (req *PostableHosts) Validate() error {
if req == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
}
if req.Start <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid start time %d: start must be greater than 0",
req.Start,
)
}
if req.End <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid end time %d: end must be greater than 0",
req.End,
)
}
if req.Start >= req.End {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid time range: start (%d) must be less than end (%d)",
req.Start,
req.End,
)
}
if req.Limit < 1 || req.Limit > 5000 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
}
if req.Offset < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "offset cannot be negative")
}
if req.Filter != nil && !req.Filter.FilterByStatus.IsZero() &&
req.Filter.FilterByStatus != HostStatusActive && req.Filter.FilterByStatus != HostStatusInactive {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid filter by status: %s", req.Filter.FilterByStatus)
}
if req.OrderBy != nil {
if !slices.Contains(HostsValidOrderByKeys, req.OrderBy.Key.Name) {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by key: %s", req.OrderBy.Key.Name)
}
if req.OrderBy.Direction != qbtypes.OrderDirectionAsc && req.OrderBy.Direction != qbtypes.OrderDirectionDesc {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by direction: %s", req.OrderBy.Direction)
}
}
return nil
}
// UnmarshalJSON validates input immediately after decoding.
func (req *PostableHosts) UnmarshalJSON(data []byte) error {
type raw PostableHosts
var decoded raw
if err := json.Unmarshal(data, &decoded); err != nil {
return err
}
*req = PostableHosts(decoded)
return req.Validate()
}

View File

@@ -1,37 +0,0 @@
package inframonitoringtypes
import "github.com/SigNoz/signoz/pkg/valuer"
type HostStatus struct {
valuer.String
}
var (
HostStatusActive = HostStatus{valuer.NewString("active")}
HostStatusInactive = HostStatus{valuer.NewString("inactive")}
HostStatusNone = HostStatus{valuer.NewString("")}
)
func (HostStatus) Enum() []any {
return []any{
HostStatusActive,
HostStatusInactive,
HostStatusNone,
}
}
const (
HostsOrderByCPU = "cpu"
HostsOrderByMemory = "memory"
HostsOrderByWait = "wait"
HostsOrderByDiskUsage = "disk_usage"
HostsOrderByLoad15 = "load15"
)
var HostsValidOrderByKeys = []string{
HostsOrderByCPU,
HostsOrderByMemory,
HostsOrderByWait,
HostsOrderByDiskUsage,
HostsOrderByLoad15,
}

View File

@@ -1,244 +0,0 @@
package inframonitoringtypes
import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestHostsListRequest_Validate(t *testing.T) {
tests := []struct {
name string
req *PostableHosts
wantErr bool
}{
{
name: "valid request",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "nil request",
req: nil,
wantErr: true,
},
{
name: "start time zero",
req: &PostableHosts{
Start: 0,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time negative",
req: &PostableHosts{
Start: -1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "end time zero",
req: &PostableHosts{
Start: 1000,
End: 0,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time greater than end time",
req: &PostableHosts{
Start: 2000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time equal to end time",
req: &PostableHosts{
Start: 1000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "limit zero",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 0,
Offset: 0,
},
wantErr: true,
},
{
name: "limit negative",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: -10,
Offset: 0,
},
wantErr: true,
},
{
name: "limit exceeds max",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 5001,
Offset: 0,
},
wantErr: true,
},
{
name: "offset negative",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: -5,
},
wantErr: true,
},
{
name: "filter by status ACTIVE",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
Filter: &HostFilter{FilterByStatus: HostStatusActive},
},
wantErr: false,
},
{
name: "filter by status INACTIVE",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
Filter: &HostFilter{FilterByStatus: HostStatusInactive},
},
wantErr: false,
},
{
name: "filter by status empty (zero value)",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "filter by status invalid value",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
Filter: &HostFilter{FilterByStatus: HostStatus{valuer.NewString("UNKNOWN")}},
},
wantErr: true,
},
{
name: "orderBy nil is valid",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "orderBy with valid key cpu and direction asc",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: HostsOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionAsc,
},
},
wantErr: false,
},
{
name: "orderBy with invalid key",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "unknown",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with valid key but invalid direction",
req: &PostableHosts{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: HostsOrderByMemory,
},
},
Direction: qbtypes.OrderDirection{String: valuer.NewString("invalid")},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.req.Validate()
if tt.wantErr {
require.Error(t, err)
require.True(t, errors.Ast(err, errors.TypeInvalidInput), "expected error to be of type InvalidInput")
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -1,108 +0,0 @@
package inframonitoringtypes
import (
"encoding/json"
"slices"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type Pods struct {
Type ResponseType `json:"type" required:"true"`
Records []PodRecord `json:"records" required:"true"`
Total int `json:"total" required:"true"`
RequiredMetricsCheck RequiredMetricsCheck `json:"requiredMetricsCheck" required:"true"`
EndTimeBeforeRetention bool `json:"endTimeBeforeRetention" required:"true"`
Warning *qbtypes.QueryWarnData `json:"warning,omitempty"`
}
type PodRecord struct {
PodUID string `json:"podUID" required:"true"`
PodCPU float64 `json:"podCPU" required:"true"`
PodCPURequest float64 `json:"podCPURequest" required:"true"`
PodCPULimit float64 `json:"podCPULimit" required:"true"`
PodMemory float64 `json:"podMemory" required:"true"`
PodMemoryRequest float64 `json:"podMemoryRequest" required:"true"`
PodMemoryLimit float64 `json:"podMemoryLimit" required:"true"`
PodPhase PodPhase `json:"podPhase" required:"true"`
PendingPodCount int `json:"pendingPodCount" required:"true"`
RunningPodCount int `json:"runningPodCount" required:"true"`
SucceededPodCount int `json:"succeededPodCount" required:"true"`
FailedPodCount int `json:"failedPodCount" required:"true"`
PodAge int64 `json:"podAge" required:"true"`
Meta map[string]any `json:"meta" required:"true"`
}
// PostablePods is the request body for the v2 pods list API.
type PostablePods struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Filter *qbtypes.Filter `json:"filter"`
GroupBy []qbtypes.GroupByKey `json:"groupBy"`
OrderBy *qbtypes.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
// Validate ensures PostablePods contains acceptable values.
func (req *PostablePods) Validate() error {
if req == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
}
if req.Start <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid start time %d: start must be greater than 0",
req.Start,
)
}
if req.End <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid end time %d: end must be greater than 0",
req.End,
)
}
if req.Start >= req.End {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid time range: start (%d) must be less than end (%d)",
req.Start,
req.End,
)
}
if req.Limit < 1 || req.Limit > 5000 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
}
if req.Offset < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "offset cannot be negative")
}
if req.OrderBy != nil {
if !slices.Contains(PodsValidOrderByKeys, req.OrderBy.Key.Name) {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by key: %s", req.OrderBy.Key.Name)
}
if req.OrderBy.Direction != qbtypes.OrderDirectionAsc && req.OrderBy.Direction != qbtypes.OrderDirectionDesc {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by direction: %s", req.OrderBy.Direction)
}
}
return nil
}
// UnmarshalJSON validates input immediately after decoding.
func (req *PostablePods) UnmarshalJSON(data []byte) error {
type raw PostablePods
var decoded raw
if err := json.Unmarshal(data, &decoded); err != nil {
return err
}
*req = PostablePods(decoded)
return req.Validate()
}

View File

@@ -1,43 +0,0 @@
package inframonitoringtypes
import "github.com/SigNoz/signoz/pkg/valuer"
type PodPhase struct {
valuer.String
}
var (
PodPhasePending = PodPhase{valuer.NewString("pending")}
PodPhaseRunning = PodPhase{valuer.NewString("running")}
PodPhaseSucceeded = PodPhase{valuer.NewString("succeeded")}
PodPhaseFailed = PodPhase{valuer.NewString("failed")}
PodPhaseNone = PodPhase{valuer.NewString("")}
)
func (PodPhase) Enum() []any {
return []any{
PodPhasePending,
PodPhaseRunning,
PodPhaseSucceeded,
PodPhaseFailed,
PodPhaseNone,
}
}
const (
PodsOrderByCPU = "cpu"
PodsOrderByCPURequest = "cpu_request"
PodsOrderByCPULimit = "cpu_limit"
PodsOrderByMemory = "memory"
PodsOrderByMemoryRequest = "memory_request"
PodsOrderByMemoryLimit = "memory_limit"
)
var PodsValidOrderByKeys = []string{
PodsOrderByCPU,
PodsOrderByCPURequest,
PodsOrderByCPULimit,
PodsOrderByMemory,
PodsOrderByMemoryRequest,
PodsOrderByMemoryLimit,
}

View File

@@ -1,219 +0,0 @@
package inframonitoringtypes
import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestPostablePods_Validate(t *testing.T) {
tests := []struct {
name string
req *PostablePods
wantErr bool
}{
{
name: "valid request",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "nil request",
req: nil,
wantErr: true,
},
{
name: "start time zero",
req: &PostablePods{
Start: 0,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time negative",
req: &PostablePods{
Start: -1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "end time zero",
req: &PostablePods{
Start: 1000,
End: 0,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time greater than end time",
req: &PostablePods{
Start: 2000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time equal to end time",
req: &PostablePods{
Start: 1000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "limit zero",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 0,
Offset: 0,
},
wantErr: true,
},
{
name: "limit negative",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: -10,
Offset: 0,
},
wantErr: true,
},
{
name: "limit exceeds max",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 5001,
Offset: 0,
},
wantErr: true,
},
{
name: "offset negative",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: -5,
},
wantErr: true,
},
{
name: "orderBy nil is valid",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "orderBy with valid key cpu and direction asc",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: PodsOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionAsc,
},
},
wantErr: false,
},
{
name: "orderBy with phase key is rejected",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "phase",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with invalid key",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "unknown",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with valid key but invalid direction",
req: &PostablePods{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: PodsOrderByMemory,
},
},
Direction: qbtypes.OrderDirection{String: valuer.NewString("invalid")},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.req.Validate()
if tt.wantErr {
require.Error(t, err)
require.True(t, errors.Ast(err, errors.TypeInvalidInput), "expected error to be of type InvalidInput")
} else {
require.NoError(t, err)
}
})
}
}