Compare commits

..

143 Commits

Author SHA1 Message Date
Nikhil Mantri
9a53f6ad39 Merge branch 'main' into infraM/v2_namespaces_list_api 2026-05-08 11:38:27 +05:30
nikhilmantri0902
1915a905bb chore: namespace conflicts resolved and main merged 2026-05-07 17:29:28 +05:30
nikhilmantri0902
2d2c9b4d2a chore: namespace record uses PodCountsByPhase 2026-05-05 18:21:59 +05:30
nikhilmantri0902
4765bba05c Merge remote-tracking branch 'origin/infraM/v2_nodes_list_api' into infraM/v2_namespaces_list_api
# Conflicts:
#	frontend/src/api/generated/services/inframonitoring/index.ts
2026-05-05 18:13:38 +05:30
nikhilmantri0902
db79e6492e chore: node and pod counts structs added 2026-05-05 15:31:25 +05:30
nikhilmantri0902
78da4ee819 chore: for pods and nodes, replace none with no_data 2026-05-05 15:07:34 +05:30
Nikhil Mantri
7e812277ec Merge branch 'main' into infraM/v2_nodes_list_api 2026-05-05 13:53:21 +05:30
nikhilmantri0902
48a1e90402 chore: added pod phase counts 2026-05-05 13:32:52 +05:30
Nikhil Mantri
e33c14a70b Merge branch 'main' into infraM/v2_nodes_list_api 2026-05-05 12:40:34 +05:30
Nikhil Mantri
ba0d3ea484 Merge branch 'main' into infraM/v2_nodes_list_api 2026-04-30 10:47:46 +05:30
nikhilmantri0902
a55e65ae43 chore: rename 2026-04-29 11:52:00 +05:30
nikhilmantri0902
7f14bc0e0a chore: namespaces code 2026-04-29 11:52:00 +05:30
Nikhil Mantri
1c9e31ad00 Merge branch 'main' into infraM/v2_nodes_list_api 2026-04-29 11:50:36 +05:30
nikhilmantri0902
8b0e8f666e chore: v2 nodes api 2026-04-28 17:02:19 +05:30
nikhilmantri0902
1d89b03f10 chore: merged main and resolved conflicts 2026-04-28 16:38:57 +05:30
nikhilmantri0902
3a61a78986 chore: merged base branch 2026-04-28 12:48:20 +05:30
Nikhil Mantri
46e833faba Merge branch 'main' into infraM/v2_pods_list_api 2026-04-28 12:15:56 +05:30
nikhilmantri0902
4bd7492629 chore: updated comment 2026-04-28 12:07:04 +05:30
nikhilmantri0902
401701e036 chore: metadata fix 2026-04-27 19:23:11 +05:30
nikhilmantri0902
3bec0df0ad chore: nodes list v2 full blown 2026-04-27 16:17:39 +05:30
Nikhil Mantri
24fe9a986d feat(infra-monitoring): v2 pods list apis - phase counts when custom grouping (#11088)
* chore: added phase counts feature

* chore: added queries for pod phase counts in custom group by

* chore: added unknown phase count

* fix: isPodUIDInGroupBy in buildPodRecords

* chore: 3 cte --> 2 cte

* chore: pod phase with local table of time series as counts

* chore: comment correction

* chore: corrected comment

* chore: value column for samples table added

* chore: removed query G for phase counts

* chore: rename variable

* chore: added PodPhaseNum constants to types
2026-04-27 16:04:55 +05:30
nikhilmantri0902
520e92049c Merge branch 'feat/v2_pods_list_api_phase_counts' of github.com:SigNoz/signoz into infraM/v2_nodes_list_api 2026-04-27 15:43:09 +05:30
Nikhil Mantri
92d297ac9d Merge branch 'main' into infraM/v2_pods_list_api 2026-04-27 15:05:04 +05:30
nikhilmantri0902
eff29aefba chore: added PodPhaseNum constants to types 2026-04-27 13:59:08 +05:30
nikhilmantri0902
ca73453c9e chore: rename variable 2026-04-27 13:53:17 +05:30
nikhilmantri0902
1d836d674f chore: removed query G for phase counts 2026-04-27 13:46:03 +05:30
nikhilmantri0902
83724b0cde chore: value column for samples table added 2026-04-27 13:09:46 +05:30
nikhilmantri0902
3050e37ec7 chore: corrected comment 2026-04-27 12:43:17 +05:30
Ashwin Bhatkal
bdbaa32485 Merge branch 'main' into infraM/v2_pods_list_api 2026-04-27 11:44:13 +05:30
nikhilmantri0902
55b2215025 chore: comment correction 2026-04-24 17:14:56 +05:30
nikhilmantri0902
e90378e618 Merge branch 'infraM/v2_pods_list_api' into feat/v2_pods_list_api_phase_counts 2026-04-24 16:47:15 +05:30
nikhilmantri0902
4592b78f48 chore: pod phase with local table of time series as counts 2026-04-24 15:27:38 +05:30
nikhilmantri0902
d81c99feae chore: 3 cte --> 2 cte 2026-04-24 14:58:26 +05:30
nikhilmantri0902
65a456ff9e fix: isPodUIDInGroupBy in buildPodRecords 2026-04-24 14:36:43 +05:30
nikhilmantri0902
264577b673 chore: added unknown phase count 2026-04-24 13:29:09 +05:30
Ashwin Bhatkal
b35c6676f9 fix: rebase fixes 2026-04-24 13:17:02 +05:30
nikhilmantri0902
c78c9a42db chore: merged base 2026-04-24 13:03:21 +05:30
nikhilmantri0902
1095caa123 chore: improved api description to document -1 as no data in numeric fields 2026-04-24 12:11:45 +05:30
nikhilmantri0902
9043b49762 chore: removed pods - order by phase 2026-04-24 12:04:51 +05:30
nikhilmantri0902
d4084a7494 chore: added support for pod phase unknown 2026-04-24 11:46:26 +05:30
nikhilmantri0902
27c564b3bf chore: added required tags 2026-04-24 11:21:20 +05:30
Nikhil Mantri
f02c491828 Merge branch 'main' into infraM/v2_pods_list_api 2026-04-24 10:47:13 +05:30
Nikhil Mantri
3d53b8f77f Merge branch 'main' into infraM/v2_pods_list_api 2026-04-23 18:44:33 +05:30
nikhilmantri0902
dffe94fec4 chore: conflicts resolved 2026-04-23 18:39:39 +05:30
nikhilmantri0902
b7d4f18aae chore: added queries for pod phase counts in custom group by 2026-04-23 18:01:26 +05:30
nikhilmantri0902
9ad2ec428a chore: added phase counts feature 2026-04-23 16:50:39 +05:30
nikhilmantri0902
c9360fcf13 Merge branch 'infraM/v2_hosts_list_api' into infraM/v2_pods_list_api 2026-04-23 11:23:49 +05:30
nikhilmantri0902
b5ab45db20 chore: regen api client for inframonitoring
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 10:51:35 +05:30
Nikhil Mantri
08f76aca78 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-23 09:51:01 +05:30
nikhilmantri0902
983d4fe4f2 Merge branch 'infraM/v2_hosts_list_api' into infraM/v2_pods_list_api 2026-04-22 15:37:21 +05:30
nikhilmantri0902
833af794c3 chore: make sort stable in case of tiebreaker by comparing composite group by keys 2026-04-22 15:26:28 +05:30
nikhilmantri0902
21b51d1fcc chore: cleanup and rename 2026-04-22 15:13:00 +05:30
nikhilmantri0902
56f22682c8 Merge branch 'infraM/v2_hosts_list_api' into infraM/v2_pods_list_api 2026-04-22 14:29:17 +05:30
nikhilmantri0902
9c8359940c chore: remove a defensive nil map check, the function ensure non-nil map when err nil 2026-04-22 11:59:01 +05:30
Nikhil Mantri
4050880275 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-22 11:35:57 +05:30
nikhilmantri0902
5e775f64f2 chore: added status unauthorized 2026-04-21 21:30:44 +05:30
nikhilmantri0902
0189f23f46 chore: removed internal server error 2026-04-21 21:30:01 +05:30
nikhilmantri0902
49a36d4e3d chore: removed pod metric temporalities 2026-04-21 21:24:49 +05:30
nikhilmantri0902
9407d658ab chore: merge base hosts v2 branch 2026-04-21 21:17:28 +05:30
nikhilmantri0902
5035712485 chore: added json tag required: true 2026-04-21 18:50:25 +05:30
nikhilmantri0902
bab17c3615 chore: comments resolve 2026-04-21 18:33:56 +05:30
Nikhil Mantri
37b44f4db9 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-21 17:40:06 +05:30
nikhilmantri0902
99dd6e5f1e chore: pods code restructuring 2026-04-21 17:03:13 +05:30
nikhilmantri0902
9c7131fa6a chore: merge base branch 2026-04-21 16:22:55 +05:30
Nikhil Mantri
ad889a2e1d Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-21 13:48:53 +05:30
nikhilmantri0902
a4f6d0cbf5 chore: removed temporalities 2026-04-21 13:44:06 +05:30
nikhilmantri0902
589bed7c16 chore: comments correction 2026-04-21 12:50:51 +05:30
nikhilmantri0902
93843a1f48 chore: file structure further breakdown for clarity 2026-04-21 12:36:07 +05:30
nikhilmantri0902
88c43108fc chore: added types package 2026-04-20 18:52:43 +05:30
nikhilmantri0902
ed4cf540e8 chore: inframonitoring types renaming 2026-04-20 18:47:28 +05:30
nikhilmantri0902
9e2dfa9033 chore: rearrangement 2026-04-20 17:51:03 +05:30
nikhilmantri0902
d98d5d68ee chore: rename PodsList -> ListPods 2026-04-20 16:57:21 +05:30
nikhilmantri0902
2cb1c3b73b chore: rename HostsList -> ListHosts 2026-04-20 16:42:19 +05:30
nikhilmantri0902
ae7ca497ad chore: merged base hosts branch and reorganized code 2026-04-20 13:38:25 +05:30
Nikhil Mantri
a579916961 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-20 11:05:36 +05:30
Nikhil Mantri
4a16d56abf feat(infra-monitoring): v2 hosts list - return counts of active & inactive hosts for custom group by attributes (#10956)
* chore: add functionality for showing active and inactive counts in custom group by

* chore: bug fix

* chore: added subquery for active and total count

* chore: ignore empty string hosts in get active hosts

* fix: sinceUnixMilli for determining active hosts compute once per request

* chore: refactor code
2026-04-20 10:41:15 +05:30
Nikhil Mantri
642b5ac3f0 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-16 16:32:39 +05:30
Nikhil Mantri
a12112619c Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-16 15:41:35 +05:30
nikhilmantri0902
014785f1bc chore: ignore empty string hosts in get active hosts 2026-04-16 13:17:15 +05:30
Nikhil Mantri
58ee797b10 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-15 14:18:29 +05:30
Nikhil Mantri
82d236742f Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-15 11:21:33 +05:30
nikhilmantri0902
397e1ad5be chore: added TODOs and made filterByStatus a part of filter struct 2026-04-14 18:32:48 +05:30
nikhilmantri0902
8d6b25ca9b chore: resolved conflicts 2026-04-14 17:09:17 +05:30
nikhilmantri0902
5fa6bd8b8d Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-13 11:02:14 +05:30
nikhilmantri0902
bd9977483b chore: improved description 2026-04-11 11:31:35 +05:30
nikhilmantri0902
50fbdfeeef chore: validate order by to validate function 2026-04-10 19:01:45 +05:30
nikhilmantri0902
e2b1b73e87 chore: improvements 2026-04-10 13:23:33 +05:30
nikhilmantri0902
cb9f3fd3e5 chore: rearrage 2026-04-10 00:39:23 +05:30
nikhilmantri0902
232acc343d chore: escape backtick to prevent sql injection 2026-04-10 00:01:01 +05:30
nikhilmantri0902
2025afdccc chore: endpoint modification openapi 2026-04-09 23:25:59 +05:30
nikhilmantri0902
d2f4d4af93 chore: endpoint correction 2026-04-09 23:21:57 +05:30
Nikhil Mantri
47ff7bbb8e Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-09 23:20:39 +05:30
Nikhil Mantri
724071c5dc Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-09 18:30:15 +05:30
nikhilmantri0902
4d24979358 chore: frontend fix 2026-04-09 18:26:42 +05:30
nikhilmantri0902
042943b10a chore: distributed samples table to local table change for get metadata 2026-04-09 18:24:45 +05:30
nikhilmantri0902
48a9be7ec8 chore: added required metrics check 2026-04-09 17:38:48 +05:30
nikhilmantri0902
a9504b2120 chore: added a TODO remark 2026-04-09 16:08:34 +05:30
nikhilmantri0902
8755887c4a chore: added better metrics existence check 2026-04-09 16:01:35 +05:30
Nikhil Mantri
4cb4662b3a Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-09 15:14:25 +05:30
nikhilmantri0902
e6900dabc8 chore: warnings added passing from queryResponse warning to host lists response struct 2026-04-09 00:09:38 +05:30
nikhilmantri0902
c1ba389b63 chore: add type for response and files rearrange 2026-04-08 23:35:53 +05:30
nikhilmantri0902
3a1f40234f Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-08 23:03:50 +05:30
Nikhil Mantri
2e4891fa63 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-08 16:07:57 +05:30
Nikhil Mantri
04ebc0bec7 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-08 11:08:10 +05:30
nikhilmantri0902
271f9b81ed Merge branch 'infraM/v2_hosts_list_api' into infraM/v2_pods_list_api 2026-04-07 21:55:47 +05:30
nikhilmantri0902
6fa815c294 chore: modified getMetadata query 2026-04-07 18:55:57 +05:30
nikhilmantri0902
63ec518efb chore: added hostName logic 2026-04-07 17:36:15 +05:30
nikhilmantri0902
c4ca20dd90 chore: return errors from getMetadata and lint fix 2026-04-07 17:01:13 +05:30
nikhilmantri0902
e56cc4222b chore: return errors from getMetadata and lint fix 2026-04-07 16:57:35 +05:30
nikhilmantri0902
07d2944d7c chore: yarn generate api 2026-04-07 16:44:06 +05:30
nikhilmantri0902
dea01ae36a chore: hostStatusNone added for clarity that this field can be left empty as well in payload 2026-04-07 16:32:25 +05:30
nikhilmantri0902
62ea5b54e2 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-07 14:09:48 +05:30
nikhilmantri0902
e549a7e42f chore: added pods list api updates 2026-04-07 13:58:10 +05:30
nikhilmantri0902
90e2ebb11f Merge branch 'infraM/v2_hosts_list_api' into infraM/v2_pods_list_api 2026-04-07 13:51:35 +05:30
nikhilmantri0902
61baa1be7a chore: code improvements 2026-04-07 13:49:00 +05:30
nikhilmantri0902
b946fa665f Merge branch 'infraM/v2_hosts_list_api' into infraM/v2_pods_list_api 2026-04-07 11:15:35 +05:30
nikhilmantri0902
2e049556e4 chore: unified composite key function 2026-04-07 11:15:03 +05:30
nikhilmantri0902
492a5e70d7 chore: added pods metrics temporality 2026-04-06 17:33:44 +05:30
nikhilmantri0902
ba1f2771e8 Merge branch 'infraM/v2_hosts_list_api' into infraM/v2_pods_list_api 2026-04-06 17:18:44 +05:30
nikhilmantri0902
7458fb4855 Merge branch 'main' into infraM/v2_hosts_list_api 2026-04-06 17:18:01 +05:30
nikhilmantri0902
5f55f3938b chore: added temporalities of metrics 2026-04-06 17:17:15 +05:30
nikhilmantri0902
3e8102485c Merge branch 'infraM/v2_hosts_list_api' into infraM/v2_pods_list_api 2026-04-04 20:52:50 +05:30
nikhilmantri0902
861c682ea5 chore: nil pointer dereference fix in req.Filter 2026-04-04 20:52:08 +05:30
nikhilmantri0902
c8e5895dff chore: nil pointer check 2026-04-04 20:45:04 +05:30
nikhilmantri0902
82d72e7edb chore: pods api meta start time 2026-04-04 17:18:04 +05:30
nikhilmantri0902
a3f8ecaaf1 chore: merged base branch 2026-04-04 16:47:10 +05:30
nikhilmantri0902
19aada656c chore: updated spec 2026-04-04 16:44:15 +05:30
nikhilmantri0902
b21bb4280f chore: updated openapi yml 2026-04-04 16:38:22 +05:30
nikhilmantri0902
bc0a4fdb5c chore: added pods list logic 2026-04-04 13:24:46 +05:30
nikhilmantri0902
37fb0e9254 Merge branch 'infraM/base_dependencies' into infraM/v2_hosts_list_api 2026-04-03 17:49:00 +05:30
nikhilmantri0902
aecfa1a174 chore: added validation on order by 2026-04-02 20:13:30 +05:30
nikhilmantri0902
b869d23d94 chore: moved funcs 2026-04-02 20:02:22 +05:30
nikhilmantri0902
6ee3d44f76 chore: removed isSendingK8sAgentsMetricsCode 2026-04-02 19:58:30 +05:30
nikhilmantri0902
462e554107 chore: yarn generate api 2026-04-02 14:49:15 +05:30
nikhilmantri0902
66afa73e6f chore: return status as a string 2026-04-02 14:39:02 +05:30
nikhilmantri0902
54c604bcf4 chore: added some unit tests 2026-04-02 14:20:27 +05:30
nikhilmantri0902
c1be02ba54 chore: added validate function 2026-04-02 14:14:34 +05:30
nikhilmantri0902
d3c7ba8f45 chore: disk usage 2026-04-02 14:01:18 +05:30
nikhilmantri0902
039c4a0496 fix: bug fix 2026-04-02 11:32:49 +05:30
nikhilmantri0902
51a94b6bbc chore: added logic for hosts v3 api 2026-04-02 02:52:28 +05:30
nikhilmantri0902
bbfbb94f52 chore: merged main 2026-04-01 00:45:40 +05:30
nikhilmantri0902
d1eb9ef16f chore: endpoint detail update 2026-03-31 16:16:31 +05:30
nikhilmantri0902
3db00f8bc3 chore: baseline setup 2026-03-31 15:27:18 +05:30
29 changed files with 1217 additions and 771 deletions

View File

@@ -2598,6 +2598,54 @@ components:
- requiredMetricsCheck
- endTimeBeforeRetention
type: object
InframonitoringtypesNamespaceRecord:
properties:
meta:
additionalProperties:
type: string
nullable: true
type: object
namespaceCPU:
format: double
type: number
namespaceMemory:
format: double
type: number
namespaceName:
type: string
podCountsByPhase:
$ref: '#/components/schemas/InframonitoringtypesPodCountsByPhase'
required:
- namespaceName
- namespaceCPU
- namespaceMemory
- podCountsByPhase
- meta
type: object
InframonitoringtypesNamespaces:
properties:
endTimeBeforeRetention:
type: boolean
records:
items:
$ref: '#/components/schemas/InframonitoringtypesNamespaceRecord'
nullable: true
type: array
requiredMetricsCheck:
$ref: '#/components/schemas/InframonitoringtypesRequiredMetricsCheck'
total:
type: integer
type:
$ref: '#/components/schemas/InframonitoringtypesResponseType'
warning:
$ref: '#/components/schemas/Querybuildertypesv5QueryWarnData'
required:
- type
- records
- total
- requiredMetricsCheck
- endTimeBeforeRetention
type: object
InframonitoringtypesNodeCondition:
enum:
- ready
@@ -2801,6 +2849,32 @@ components:
- end
- limit
type: object
InframonitoringtypesPostableNamespaces:
properties:
end:
format: int64
type: integer
filter:
$ref: '#/components/schemas/Querybuildertypesv5Filter'
groupBy:
items:
$ref: '#/components/schemas/Querybuildertypesv5GroupByKey'
nullable: true
type: array
limit:
type: integer
offset:
type: integer
orderBy:
$ref: '#/components/schemas/Querybuildertypesv5OrderBy'
start:
format: int64
type: integer
required:
- start
- end
- limit
type: object
InframonitoringtypesPostableNodes:
properties:
end:
@@ -11731,6 +11805,74 @@ paths:
summary: List Hosts for Infra Monitoring
tags:
- inframonitoring
/api/v2/infra_monitoring/namespaces:
post:
deprecated: false
description: 'Returns a paginated list of Kubernetes namespaces with key aggregated
pod metrics: CPU usage and memory working set (summed across pods in the group),
plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown
} from each pod''s latest k8s.pod.phase value in the window). Each namespace
includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response
type is ''list'' for the default k8s.namespace.name grouping or ''grouped_list''
for custom groupBy keys; in both modes every row aggregates pods in the group.
Supports filtering via a filter expression, custom groupBy, ordering by cpu
/ memory, and pagination via offset/limit. Also reports missing required metrics
and whether the requested time range falls before the data retention boundary.
Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel
when no data is available for that field.'
operationId: ListNamespaces
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/InframonitoringtypesPostableNamespaces'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/InframonitoringtypesNamespaces'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: List Namespaces for Infra Monitoring
tags:
- inframonitoring
/api/v2/infra_monitoring/nodes:
post:
deprecated: false

View File

@@ -13,9 +13,11 @@ import type {
import type {
InframonitoringtypesPostableHostsDTO,
InframonitoringtypesPostableNamespacesDTO,
InframonitoringtypesPostableNodesDTO,
InframonitoringtypesPostablePodsDTO,
ListHosts200,
ListNamespaces200,
ListNodes200,
ListPods200,
RenderErrorResponseDTO,
@@ -108,6 +110,90 @@ export const useListHosts = <
return useMutation(mutationOptions);
};
/**
* Returns a paginated list of Kubernetes namespaces with key aggregated pod metrics: CPU usage and memory working set (summed across pods in the group), plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } from each pod's latest k8s.pod.phase value in the window). Each namespace includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response type is 'list' for the default k8s.namespace.name grouping or 'grouped_list' for custom groupBy keys; in both modes every row aggregates pods in the group. Supports filtering via a filter expression, custom groupBy, ordering by cpu / memory, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel when no data is available for that field.
* @summary List Namespaces for Infra Monitoring
*/
export const listNamespaces = (
inframonitoringtypesPostableNamespacesDTO: BodyType<InframonitoringtypesPostableNamespacesDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<ListNamespaces200>({
url: `/api/v2/infra_monitoring/namespaces`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: inframonitoringtypesPostableNamespacesDTO,
signal,
});
};
export const getListNamespacesMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listNamespaces>>,
TError,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof listNamespaces>>,
TError,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
TContext
> => {
const mutationKey = ['listNamespaces'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof listNamespaces>>,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> }
> = (props) => {
const { data } = props ?? {};
return listNamespaces(data);
};
return { mutationFn, ...mutationOptions };
};
export type ListNamespacesMutationResult = NonNullable<
Awaited<ReturnType<typeof listNamespaces>>
>;
export type ListNamespacesMutationBody =
BodyType<InframonitoringtypesPostableNamespacesDTO>;
export type ListNamespacesMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary List Namespaces for Infra Monitoring
*/
export const useListNamespaces = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof listNamespaces>>,
TError,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof listNamespaces>>,
TError,
{ data: BodyType<InframonitoringtypesPostableNamespacesDTO> },
TContext
> => {
const mutationOptions = getListNamespacesMutationOptions(options);
return useMutation(mutationOptions);
};
/**
* Returns a paginated list of Kubernetes nodes with key metrics: CPU usage, CPU allocatable, memory working set, memory allocatable, per-group nodeCountsByReadiness ({ ready, notReady } from each node's latest k8s.node.condition_ready in the window) and per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } for pods scheduled on the listed nodes). Each node includes metadata attributes (k8s.node.uid, k8s.cluster.name). The response type is 'list' for the default k8s.node.name grouping (each row is one node with its current condition string: ready / not_ready / no_data) or 'grouped_list' for custom groupBy keys (each row aggregates nodes in the group; condition stays no_data). Supports filtering via a filter expression, custom groupBy, ordering by cpu / cpu_allocatable / memory / memory_allocatable, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (nodeCPU, nodeCPUAllocatable, nodeMemory, nodeMemoryAllocatable) return -1 as a sentinel when no data is available for that field.
* @summary List Nodes for Infra Monitoring

View File

@@ -4652,6 +4652,55 @@ export interface InframonitoringtypesHostsDTO {
warning?: Querybuildertypesv5QueryWarnDataDTO;
}
/**
* @nullable
*/
export type InframonitoringtypesNamespaceRecordDTOMeta = {
[key: string]: string;
} | null;
export interface InframonitoringtypesNamespaceRecordDTO {
/**
* @type object
* @nullable true
*/
meta: InframonitoringtypesNamespaceRecordDTOMeta;
/**
* @type number
* @format double
*/
namespaceCPU: number;
/**
* @type number
* @format double
*/
namespaceMemory: number;
/**
* @type string
*/
namespaceName: string;
podCountsByPhase: InframonitoringtypesPodCountsByPhaseDTO;
}
export interface InframonitoringtypesNamespacesDTO {
/**
* @type boolean
*/
endTimeBeforeRetention: boolean;
/**
* @type array
* @nullable true
*/
records: InframonitoringtypesNamespaceRecordDTO[] | null;
requiredMetricsCheck: InframonitoringtypesRequiredMetricsCheckDTO;
/**
* @type integer
*/
total: number;
type: InframonitoringtypesResponseTypeDTO;
warning?: Querybuildertypesv5QueryWarnDataDTO;
}
export enum InframonitoringtypesNodeConditionDTO {
ready = 'ready',
not_ready = 'not_ready',
@@ -4863,6 +4912,34 @@ export interface InframonitoringtypesPostableHostsDTO {
start: number;
}
export interface InframonitoringtypesPostableNamespacesDTO {
/**
* @type integer
* @format int64
*/
end: number;
filter?: Querybuildertypesv5FilterDTO;
/**
* @type array
* @nullable true
*/
groupBy?: Querybuildertypesv5GroupByKeyDTO[] | null;
/**
* @type integer
*/
limit: number;
/**
* @type integer
*/
offset?: number;
orderBy?: Querybuildertypesv5OrderByDTO;
/**
* @type integer
* @format int64
*/
start: number;
}
export interface InframonitoringtypesPostableNodesDTO {
/**
* @type integer
@@ -9242,6 +9319,14 @@ export type ListHosts200 = {
status: string;
};
export type ListNamespaces200 = {
data: InframonitoringtypesNamespacesDTO;
/**
* @type string
*/
status: string;
};
export type ListNodes200 = {
data: InframonitoringtypesNodesDTO;
/**

View File

@@ -398,7 +398,7 @@ describe('useTableParams (selective URL mode — partial config object)', () =>
.filter(Boolean)
.pop();
expect(lastExpanded).toBeDefined();
expect(JSON.parse(lastExpanded!)).toEqual(
expect(JSON.parse(lastExpanded!)).toStrictEqual(
expect.arrayContaining(['row-1', 'row-2']),
);

View File

@@ -67,5 +67,24 @@ func (provider *provider) addInfraMonitoringRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v2/infra_monitoring/namespaces", handler.New(
provider.authZ.ViewAccess(provider.infraMonitoringHandler.ListNamespaces),
handler.OpenAPIDef{
ID: "ListNamespaces",
Tags: []string{"inframonitoring"},
Summary: "List Namespaces for Infra Monitoring",
Description: "Returns a paginated list of Kubernetes namespaces with key aggregated pod metrics: CPU usage and memory working set (summed across pods in the group), plus per-group podCountsByPhase ({ pending, running, succeeded, failed, unknown } from each pod's latest k8s.pod.phase value in the window). Each namespace includes metadata attributes (k8s.namespace.name, k8s.cluster.name). The response type is 'list' for the default k8s.namespace.name grouping or 'grouped_list' for custom groupBy keys; in both modes every row aggregates pods in the group. Supports filtering via a filter expression, custom groupBy, ordering by cpu / memory, and pagination via offset/limit. Also reports missing required metrics and whether the requested time range falls before the data retention boundary. Numeric metric fields (namespaceCPU, namespaceMemory) return -1 as a sentinel when no data is available for that field.",
Request: new(inframonitoringtypes.PostableNamespaces),
RequestContentType: "application/json",
Response: new(inframonitoringtypes.Namespaces),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -93,3 +93,27 @@ func (h *handler) ListNodes(rw http.ResponseWriter, req *http.Request) {
render.Success(rw, http.StatusOK, result)
}
func (h *handler) ListNamespaces(rw http.ResponseWriter, req *http.Request) {
claims, err := authtypes.ClaimsFromContext(req.Context())
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
var parsedReq inframonitoringtypes.PostableNamespaces
if err := binding.JSON.BindBody(req.Body, &parsedReq); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.ListNamespaces(req.Context(), orgID, &parsedReq)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}

View File

@@ -337,3 +337,92 @@ func (m *module) ListNodes(ctx context.Context, orgID valuer.UUID, req *inframon
return resp, nil
}
func (m *module) ListNamespaces(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNamespaces) (*inframonitoringtypes.Namespaces, error) {
if err := req.Validate(); err != nil {
return nil, err
}
resp := &inframonitoringtypes.Namespaces{}
if req.OrderBy == nil {
req.OrderBy = &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: inframonitoringtypes.NamespacesOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionDesc,
}
}
if len(req.GroupBy) == 0 {
req.GroupBy = []qbtypes.GroupByKey{namespaceNameGroupByKey}
resp.Type = inframonitoringtypes.ResponseTypeList
} else {
resp.Type = inframonitoringtypes.ResponseTypeGroupedList
}
missingMetrics, minFirstReportedUnixMilli, err := m.getMetricsExistenceAndEarliestTime(ctx, namespacesTableMetricNamesList)
if err != nil {
return nil, err
}
if len(missingMetrics) > 0 {
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: missingMetrics}
resp.Records = []inframonitoringtypes.NamespaceRecord{}
resp.Total = 0
return resp, nil
}
if req.End < int64(minFirstReportedUnixMilli) {
resp.EndTimeBeforeRetention = true
resp.Records = []inframonitoringtypes.NamespaceRecord{}
resp.Total = 0
return resp, nil
}
resp.RequiredMetricsCheck = inframonitoringtypes.RequiredMetricsCheck{MissingMetrics: []string{}}
metadataMap, err := m.getNamespacesTableMetadata(ctx, req)
if err != nil {
return nil, err
}
resp.Total = len(metadataMap)
pageGroups, err := m.getTopNamespaceGroups(ctx, orgID, req, metadataMap)
if err != nil {
return nil, err
}
if len(pageGroups) == 0 {
resp.Records = []inframonitoringtypes.NamespaceRecord{}
return resp, nil
}
filterExpr := ""
if req.Filter != nil {
filterExpr = req.Filter.Expression
}
fullQueryReq := buildFullQueryRequest(req.Start, req.End, filterExpr, req.GroupBy, pageGroups, m.newNamespacesTableListQuery())
queryResp, err := m.querier.QueryRange(ctx, orgID, fullQueryReq)
if err != nil {
return nil, err
}
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostablePods.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
if err != nil {
return nil, err
}
resp.Records = buildNamespaceRecords(queryResp, pageGroups, req.GroupBy, metadataMap, phaseCounts)
resp.Warning = queryResp.Warning
return resp, nil
}

View File

@@ -0,0 +1,123 @@
package implinframonitoring
import (
"context"
"slices"
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
// buildNamespaceRecords assembles the page records. Pod phase counts come from
// phaseCounts in both modes; every row is a group of pods, so there's no
// per-row "current phase" concept (unlike pods/nodes list mode).
func buildNamespaceRecords(
resp *qbtypes.QueryRangeResponse,
pageGroups []map[string]string,
groupBy []qbtypes.GroupByKey,
metadataMap map[string]map[string]string,
phaseCounts map[string]podPhaseCounts,
) []inframonitoringtypes.NamespaceRecord {
metricsMap := parseFullQueryResponse(resp, groupBy)
records := make([]inframonitoringtypes.NamespaceRecord, 0, len(pageGroups))
for _, labels := range pageGroups {
compositeKey := compositeKeyFromLabels(labels, groupBy)
namespaceName := labels[namespaceNameAttrKey]
record := inframonitoringtypes.NamespaceRecord{ // initialize with default values
NamespaceName: namespaceName,
NamespaceCPU: -1,
NamespaceMemory: -1,
Meta: map[string]string{},
}
if metrics, ok := metricsMap[compositeKey]; ok {
if v, exists := metrics["A"]; exists {
record.NamespaceCPU = v
}
if v, exists := metrics["D"]; exists {
record.NamespaceMemory = v
}
}
if phaseCountsForGroup, ok := phaseCounts[compositeKey]; ok {
record.PodCountsByPhase = inframonitoringtypes.PodCountsByPhase{
Pending: phaseCountsForGroup.Pending,
Running: phaseCountsForGroup.Running,
Succeeded: phaseCountsForGroup.Succeeded,
Failed: phaseCountsForGroup.Failed,
Unknown: phaseCountsForGroup.Unknown,
}
}
if attrs, ok := metadataMap[compositeKey]; ok {
for k, v := range attrs {
record.Meta[k] = v
}
}
records = append(records, record)
}
return records
}
func (m *module) getTopNamespaceGroups(
ctx context.Context,
orgID valuer.UUID,
req *inframonitoringtypes.PostableNamespaces,
metadataMap map[string]map[string]string,
) ([]map[string]string, error) {
orderByKey := req.OrderBy.Key.Name
queryNamesForOrderBy := orderByToNamespacesQueryNames[orderByKey]
rankingQueryName := queryNamesForOrderBy[len(queryNamesForOrderBy)-1]
topReq := &qbtypes.QueryRangeRequest{
Start: uint64(req.Start),
End: uint64(req.End),
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0, len(queryNamesForOrderBy)),
},
}
for _, envelope := range m.newNamespacesTableListQuery().CompositeQuery.Queries {
if !slices.Contains(queryNamesForOrderBy, envelope.GetQueryName()) {
continue
}
copied := envelope
if copied.Type == qbtypes.QueryTypeBuilder {
existingExpr := ""
if f := copied.GetFilter(); f != nil {
existingExpr = f.Expression
}
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
}
merged := mergeFilterExpressions(existingExpr, reqFilterExpr)
copied.SetFilter(&qbtypes.Filter{Expression: merged})
copied.SetGroupBy(req.GroupBy)
}
topReq.CompositeQuery.Queries = append(topReq.CompositeQuery.Queries, copied)
}
resp, err := m.querier.QueryRange(ctx, orgID, topReq)
if err != nil {
return nil, err
}
allMetricGroups := parseAndSortGroups(resp, rankingQueryName, req.GroupBy, req.OrderBy.Direction)
return paginateWithBackfill(allMetricGroups, metadataMap, req.GroupBy, req.Offset, req.Limit), nil
}
func (m *module) getNamespacesTableMetadata(ctx context.Context, req *inframonitoringtypes.PostableNamespaces) (map[string]map[string]string, error) {
var nonGroupByAttrs []string
for _, key := range namespaceAttrKeysForMetadata {
if !isKeyInGroupByAttrs(req.GroupBy, key) {
nonGroupByAttrs = append(nonGroupByAttrs, key)
}
}
return m.getMetadata(ctx, namespacesTableMetricNamesList, req.GroupBy, nonGroupByAttrs, req.Filter, req.Start, req.End)
}

View File

@@ -0,0 +1,92 @@
package implinframonitoring
import (
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const (
namespaceNameAttrKey = "k8s.namespace.name"
)
var namespaceNameGroupByKey = qbtypes.GroupByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: namespaceNameAttrKey,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
}
// namespacesTableMetricNamesList drives the existence/retention check.
// Includes k8s.pod.phase so the response short-circuits cleanly when a
// cluster doesn't ship the metric — even though phase isn't part of the
// QB composite query (it's queried separately via getPerGroupPodPhaseCounts).
var namespacesTableMetricNamesList = []string{
"k8s.pod.cpu.usage",
"k8s.pod.memory.working_set",
"k8s.pod.phase",
}
var namespaceAttrKeysForMetadata = []string{
"k8s.namespace.name",
"k8s.cluster.name",
}
var orderByToNamespacesQueryNames = map[string][]string{
inframonitoringtypes.NamespacesOrderByCPU: {"A"},
inframonitoringtypes.NamespacesOrderByMemory: {"D"},
}
// newNamespacesTableListQuery builds the composite QB v5 request for the namespaces list.
// Pod phase counts are derived separately via getPerGroupPodPhaseCounts (works for both
// list and grouped_list modes), so no phase query is included here.
// Query letters A and D are kept aligned with the v1 implementation.
func (m *module) newNamespacesTableListQuery() *qbtypes.QueryRangeRequest {
queries := []qbtypes.QueryEnvelope{
// Query A: CPU usage — sum of pod CPU within the group.
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.cpu.usage",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{namespaceNameGroupByKey},
Disabled: false,
},
},
// Query D: Memory working set — sum of pod memory within the group.
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "D",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "k8s.pod.memory.working_set",
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
GroupBy: []qbtypes.GroupByKey{namespaceNameGroupByKey},
Disabled: false,
},
},
}
return &qbtypes.QueryRangeRequest{
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
}
}

View File

@@ -12,10 +12,12 @@ type Handler interface {
ListHosts(http.ResponseWriter, *http.Request)
ListPods(http.ResponseWriter, *http.Request)
ListNodes(http.ResponseWriter, *http.Request)
ListNamespaces(http.ResponseWriter, *http.Request)
}
type Module interface {
ListHosts(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableHosts) (*inframonitoringtypes.Hosts, error)
ListPods(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostablePods) (*inframonitoringtypes.Pods, error)
ListNodes(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNodes) (*inframonitoringtypes.Nodes, error)
ListNamespaces(ctx context.Context, orgID valuer.UUID, req *inframonitoringtypes.PostableNamespaces) (*inframonitoringtypes.Namespaces, error)
}

View File

@@ -186,7 +186,7 @@ func (c *conditionBuilder) conditionFor(
column := columns[0]
if len(key.Evolutions) > 0 {
// we will use the corresponding column and its evolution entry for the query
newColumns, _, err := qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
newColumns, _, err := selectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
if err != nil {
return "", err
}

View File

@@ -3,7 +3,11 @@ package telemetrylogs
import (
"context"
"fmt"
"slices"
"sort"
"strconv"
"strings"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/utils"
@@ -133,6 +137,113 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
return nil, qbtypes.ErrColumnNotFound
}
// selectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
// Logic:
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
// - Rejects all evolutions before this latest base evolution
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
// - Results are sorted by ReleaseTime descending (newest first)
func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
copy(sortedEvolutions, evolutions)
// sort the evolutions by ReleaseTime ascending
sort.Slice(sortedEvolutions, func(i, j int) bool {
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
})
tsStartTime := time.Unix(0, int64(tsStart))
tsEndTime := time.Unix(0, int64(tsEnd))
// Build evolution map: column name -> evolution
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
for _, evolution := range sortedEvolutions {
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
// since if there is duplicate we would just use the oldest one.
continue
}
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
}
// Find the latest base evolution (<= tsStartTime) across ALL columns
// Evolutions are sorted, so we can break early
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
for _, evolution := range sortedEvolutions {
if evolution.ReleaseTime.After(tsStartTime) {
break
}
latestBaseEvolutionAcrossAll = evolution
}
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
if latestBaseEvolutionAcrossAll == nil {
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
}
columnLookUpMap := make(map[string]*schema.Column)
for _, column := range columns {
columnLookUpMap[column.Name] = column
}
// Collect column-evolution pairs
type colEvoPair struct {
column *schema.Column
evolution *telemetrytypes.EvolutionEntry
}
pairs := []colEvoPair{}
for _, evolution := range evolutionMap {
// Reject evolutions before the latest base evolution
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
continue
}
// skip evolutions after tsEndTime
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
continue
}
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
}
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
}
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
if len(pairs) == 0 {
for _, column := range columns {
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
}
}
}
// Sort by ReleaseTime descending (newest first)
slices.SortFunc(pairs, func(a, b colEvoPair) int {
// Sort by ReleaseTime descending (newest first)
if a.evolution.ReleaseTime.After(b.evolution.ReleaseTime) {
return -1
}
if a.evolution.ReleaseTime.Before(b.evolution.ReleaseTime) {
return 1
}
return 0
})
// Extract results
newColumns := make([]*schema.Column, len(pairs))
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
for i, pair := range pairs {
newColumns[i] = pair.column
evolutionsEntries[i] = pair.evolution
}
return newColumns, evolutionsEntries, nil
}
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
columns, err := m.getColumn(ctx, key)
if err != nil {
@@ -143,7 +254,7 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
var evolutionsEntries []*telemetrytypes.EvolutionEntry
if len(key.Evolutions) > 0 {
// we will use the corresponding column and its evolution entry for the query
newColumns, evolutionsEntries, err = qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd)
newColumns, evolutionsEntries, err = selectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd)
if err != nil {
return "", err
}

View File

@@ -886,7 +886,7 @@ func TestSelectEvolutionsForColumns(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resultColumns, resultEvols, err := qbtypes.SelectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd)
resultColumns, resultEvols, err := selectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd)
if tc.expectedError {
assert.Contains(t, err.Error(), tc.errorStr)

View File

@@ -344,11 +344,6 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
})
}
}
if _, err := t.updateColumnEvolutionMetadataForKeys(ctx, keys); err != nil {
return nil, false, err
}
return keys, complete, nil
}

View File

@@ -89,20 +89,6 @@ func TestGetKeys(t *testing.T) {
{Name: "tag_data_type", Type: "String"},
{Name: "priority", Type: "UInt8"},
}, [][]any{{"http.method", "tag", "String", 1}, {"http.method", "tag", "String", 1}}))
// Two rows above produce two evolution selectors (each contributing 4 bound args).
mock.ExpectQuery(`FROM signoz_metadata\.distributed_column_evolution_metadata`).
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
{Name: "signal", Type: "String"},
{Name: "column_name", Type: "String"},
{Name: "column_type", Type: "String"},
{Name: "field_context", Type: "String"},
{Name: "field_name", Type: "String"},
{Name: "version", Type: "UInt32"},
{Name: "release_time", Type: "Float64"},
}, [][]any{}))
keys, _, err := metadata.GetKeys(context.Background(), &telemetrytypes.FieldKeySelector{
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
@@ -261,27 +247,6 @@ func TestApplyBackwardCompatibleKeys(t *testing.T) {
}, rows))
}
// getTracesKeys / getLogsKeys both fetch evolution metadata; return an empty
// result so the existing test data flows through unchanged. Each input key
// becomes one selector contributing four bound args.
if hasTraces || hasLogs {
evoArgs := make([]any, 0, len(tt.inputKeys)*4)
for range tt.inputKeys {
evoArgs = append(evoArgs, nil, nil, nil, nil)
}
mock.ExpectQuery(`FROM signoz_metadata\.distributed_column_evolution_metadata`).
WithArgs(evoArgs...).
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
{Name: "signal", Type: "String"},
{Name: "column_name", Type: "String"},
{Name: "column_type", Type: "String"},
{Name: "field_context", Type: "String"},
{Name: "field_name", Type: "String"},
{Name: "version", Type: "UInt32"},
{Name: "release_time", Type: "Float64"},
}, [][]any{}))
}
selectors := []*telemetrytypes.FieldKeySelector{}
for _, key := range tt.inputKeys {
selectors = append(selectors, &telemetrytypes.FieldKeySelector{

View File

@@ -161,41 +161,7 @@ func (c *conditionBuilder) conditionFor(
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
var value any
column := columns[0]
if len(key.Evolutions) > 0 {
// we will use the corresponding column and its evolution entry for the query
newColumns, _, err := qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
if err != nil {
return "", err
}
if len(newColumns) == 0 {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "no valid evolution found for field %s in the given time range", key.Name)
}
// Multiple columns means fieldExpression is a multiIf returning NULL when none match,
// so a simple null check is sufficient.
if len(newColumns) > 1 {
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(fieldExpression), nil
} else {
return sb.IsNull(fieldExpression), nil
}
}
// otherwise we have to find the correct exist operator based on the column type
column = newColumns[0]
} else if len(columns) > 1 {
// Resource fields without evolution data still produce a multiIf in FieldFor;
// fall back to a null check on the multiIf result.
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(fieldExpression), nil
} else {
return sb.IsNull(fieldExpression), nil
}
}
switch column.Type.GetType() {
switch columns[0].Type.GetType() {
case schema.ColumnTypeEnumJSON:
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(fieldExpression), nil
@@ -212,7 +178,7 @@ func (c *conditionBuilder) conditionFor(
return sb.E(fieldExpression, value), nil
}
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
switch elementType := columns[0].Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
value = ""
if operator == qbtypes.FilterOperatorExists {
@@ -236,14 +202,14 @@ func (c *conditionBuilder) conditionFor(
return sb.E(fieldExpression, value), nil
}
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
keyType := columns[0].Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, columns[0].Type)
}
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
switch valueType := columns[0].Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", columns[0].Name, key.Name)
if key.Materialized {
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
}
@@ -256,7 +222,7 @@ func (c *conditionBuilder) conditionFor(
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
}
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", columns[0].Type)
}
}
return "", nil

View File

@@ -3,7 +3,6 @@ package telemetrytraces
import (
"context"
"testing"
"time"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -217,7 +216,7 @@ func TestConditionFor(t *testing.T) {
},
operator: qbtypes.FilterOperatorExists,
value: nil,
expectedSQL: "WHERE multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) IS NOT NULL",
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
expectedError: nil,
},
{
@@ -229,7 +228,7 @@ func TestConditionFor(t *testing.T) {
},
operator: qbtypes.FilterOperatorNotExists,
value: nil,
expectedSQL: "WHERE multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) IS NULL",
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NULL",
expectedError: nil,
},
{
@@ -303,85 +302,3 @@ func TestConditionFor(t *testing.T) {
})
}
}
func TestConditionForResourceWithEvolution(t *testing.T) {
ctx := context.Background()
releaseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
evolutions := mockEvolutionData(releaseTime)
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
operator qbtypes.FilterOperator
tsStart uint64
tsEnd uint64
expectedSQL string
}{
{
name: "Exists - window after release - JSON only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: evolutions,
},
operator: qbtypes.FilterOperatorExists,
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedSQL: "WHERE resource.`service.name`::String IS NOT NULL",
},
{
name: "NotExists - window after release - JSON only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: evolutions,
},
operator: qbtypes.FilterOperatorNotExists,
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedSQL: "WHERE resource.`service.name`::String IS NULL",
},
{
name: "Exists - window before release - map only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: evolutions,
},
operator: qbtypes.FilterOperatorExists,
tsStart: uint64(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedSQL: "WHERE mapContains(resources_string, 'service.name') = ?",
},
{
name: "Exists - window straddles release - multiIf null check",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: evolutions,
},
operator: qbtypes.FilterOperatorExists,
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
},
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
t.Run(tc.name, func(t *testing.T) {
cond, err := conditionBuilder.ConditionFor(ctx, tc.tsStart, tc.tsEnd, &tc.key, tc.operator, nil, sb)
require.NoError(t, err)
sb.Where(cond)
sql, _ := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
assert.Contains(t, sql, tc.expectedSQL)
})
}
}

View File

@@ -174,7 +174,7 @@ func (m *defaultFieldMapper) getColumn(
) ([]*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
return []*schema.Column{indexV3Columns["resources_string"], indexV3Columns["resource"]}, nil
return []*schema.Column{indexV3Columns["resource"]}, nil
case telemetrytypes.FieldContextScope:
return []*schema.Column{}, qbtypes.ErrColumnNotFound
case telemetrytypes.FieldContextAttribute:
@@ -254,92 +254,63 @@ func (m *defaultFieldMapper) FieldFor(
if err != nil {
return "", err
}
var newColumns []*schema.Column
var evolutionsEntries []*telemetrytypes.EvolutionEntry
if len(key.Evolutions) > 0 {
// we will use the corresponding column and its evolution entry for the query
newColumns, evolutionsEntries, err = qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
if err != nil {
return "", err
}
} else {
newColumns = columns
if len(columns) != 1 {
return "", errors.Newf(errors.TypeInternal, errors.CodeInternal, "expected exactly 1 column, got %d", len(columns))
}
column := columns[0]
exprs := []string{}
existExpr := []string{}
for i, column := range newColumns {
// Use evolution column name if available, otherwise use the column name
columnName := column.Name
if evolutionsEntries != nil && evolutionsEntries[i] != nil {
columnName = evolutionsEntries[i].ColumnName
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
// json is only supported for resource context as of now
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
}
oldColumn := indexV3Columns["resources_string"]
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
// once clickHouse dependency is updated, we need to check if we can remove it.
if key.Materialized {
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
} else {
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
}
case schema.ColumnTypeEnumString,
schema.ColumnTypeEnumUInt64,
schema.ColumnTypeEnumUInt32,
schema.ColumnTypeEnumInt8,
schema.ColumnTypeEnumInt16,
schema.ColumnTypeEnumBool,
schema.ColumnTypeEnumDateTime64,
schema.ColumnTypeEnumFixedString:
return column.Name, nil
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
return column.Name, nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
}
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
}
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
// json is only supported for resource context as of now
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
}
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
// once clickHouse dependency is updated, we need to check if we can remove it.
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
case schema.ColumnTypeEnumString,
schema.ColumnTypeEnumUInt64,
schema.ColumnTypeEnumUInt32,
schema.ColumnTypeEnumInt8,
schema.ColumnTypeEnumInt16,
schema.ColumnTypeEnumBool,
schema.ColumnTypeEnumDateTime64,
schema.ColumnTypeEnumFixedString:
exprs = append(exprs, column.Name)
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
exprs = append(exprs, column.Name)
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
}
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
}
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
// a key could have been materialized, if so return the materialized column name
if key.Materialized {
exprs = append(exprs, telemetrytypes.FieldKeyToMaterializedColumnName(key))
existExpr = append(existExpr, fmt.Sprintf("%s==true", telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)))
} else {
exprs = append(exprs, fmt.Sprintf("%s['%s']", columnName, key.Name))
existExpr = append(existExpr, fmt.Sprintf("mapContains(%s, '%s')", columnName, key.Name))
}
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
// a key could have been materialized, if so return the materialized column name
if key.Materialized {
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
}
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
}
}
if len(exprs) == 1 {
return exprs[0], nil
} else if len(exprs) > 1 {
// Ensure existExpr has the same length as exprs
if len(existExpr) != len(exprs) {
return "", errors.New(errors.TypeInternal, errors.CodeInternal, "length of exist exprs doesn't match to that of exprs")
}
finalExprs := []string{}
for i, expr := range exprs {
finalExprs = append(finalExprs, fmt.Sprintf("%s, %s", existExpr[i], expr))
}
return "multiIf(" + strings.Join(finalExprs, ", ") + ", NULL)", nil
}
// should not reach here
return columns[0].Name, nil
return column.Name, nil
}
// ColumnExpressionFor returns the column expression for the given field

View File

@@ -3,7 +3,6 @@ package telemetrytraces
import (
"context"
"testing"
"time"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -65,7 +64,7 @@ func TestGetFieldKeyName(t *testing.T) {
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
},
expectedResult: "multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL)",
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
expectedError: nil,
},
{
@@ -76,7 +75,7 @@ func TestGetFieldKeyName(t *testing.T) {
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
expectedResult: "multiIf(`resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, NULL)",
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
expectedError: nil,
},
{
@@ -104,86 +103,3 @@ func TestGetFieldKeyName(t *testing.T) {
})
}
}
func TestFieldForResourceWithEvolution(t *testing.T) {
ctx := context.Background()
releaseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
evolutions := mockEvolutionData(releaseTime)
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
tsStart uint64
tsEnd uint64
expectedResult string
}{
{
name: "Window straddles release - both columns",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
},
{
name: "Window fully after release - JSON column only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "resource.`service.name`::String",
},
{
name: "Window fully before release - map column only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "resources_string['service.name']",
},
{
name: "Window fully after release - materialized resource",
key: telemetrytypes.TelemetryFieldKey{
Name: "deployment.environment",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "resource.`deployment.environment`::String",
},
{
name: "Window straddles release - materialized resource",
key: telemetrytypes.TelemetryFieldKey{
Name: "deployment.environment",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
require.NoError(t, err)
assert.Equal(t, tc.expectedResult, result)
})
}
}

View File

@@ -16,9 +16,6 @@ import (
)
func TestStatementBuilder(t *testing.T) {
// releaseTime is chosen so it lands inside the standard [1747947419000, 1747983448000]ms
// test window, keeping the multiIf SQL form for resource fields.
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -358,7 +355,7 @@ func TestStatementBuilder(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -397,7 +394,6 @@ func TestStatementBuilder(t *testing.T) {
}
func TestStatementBuilderListQuery(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -654,7 +650,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -687,7 +683,6 @@ func TestStatementBuilderListQuery(t *testing.T) {
}
func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -716,7 +711,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
@@ -749,7 +744,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
}},
},
expected: qbtypes.Statement{
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
@@ -763,7 +758,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = c.keysMap
if mockMetadataStore.KeysMap == nil {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
}
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -794,7 +789,6 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
}
func TestStatementBuilderTraceQuery(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -917,7 +911,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -950,7 +944,6 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
}
func TestAdjustKey(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
inputKey telemetrytypes.TelemetryFieldKey
@@ -964,7 +957,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: IntrinsicFields["trace_id"],
},
{
@@ -974,7 +967,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextBody, // incorrect context
FieldDataType: telemetrytypes.FieldDataTypeInt64,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "duration_nano",
FieldContext: telemetrytypes.FieldContextSpan, // should be corrected
@@ -988,7 +981,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextSpan, // correct context
FieldDataType: telemetrytypes.FieldDataTypeInt64,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "duration_nano",
FieldContext: telemetrytypes.FieldContextSpan, // should be corrected
@@ -1002,8 +995,8 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: *buildCompleteFieldKeyMap(releaseTime)["service.name"][0],
keysMap: buildCompleteFieldKeyMap(),
expectedKey: *buildCompleteFieldKeyMap()["service.name"][0],
},
{
name: "single matching key with context specified - override",
@@ -1012,8 +1005,8 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: *buildCompleteFieldKeyMap(releaseTime)["cart.items_count"][0],
keysMap: buildCompleteFieldKeyMap(),
expectedKey: *buildCompleteFieldKeyMap()["cart.items_count"][0],
},
{
name: "multiple matching keys - all materialized",
@@ -1050,7 +1043,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "mixed.materialization.key",
FieldDataType: telemetrytypes.FieldDataTypeString,
@@ -1064,7 +1057,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "mixed.materialization.key",
FieldContext: telemetrytypes.FieldContextAttribute,
@@ -1079,7 +1072,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "unknown.field",
Materialized: false,
@@ -1092,7 +1085,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextAttribute,
@@ -1107,7 +1100,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "cart.items_count",
FieldContext: telemetrytypes.FieldContextAttribute,
@@ -1122,7 +1115,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
@@ -1165,7 +1158,6 @@ func TestAdjustKey(t *testing.T) {
}
func TestAdjustKeys(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
@@ -1191,7 +1183,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedSelectFields: []telemetrytypes.TelemetryFieldKey{
{
Name: "service.name",
@@ -1228,7 +1220,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedGroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
@@ -1275,7 +1267,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedOrder: []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{
@@ -1334,7 +1326,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
expectedSelectFields: []telemetrytypes.TelemetryFieldKey{
{
Name: "trace_id",
@@ -1389,7 +1381,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(releaseTime),
keysMap: buildCompleteFieldKeyMap(),
// After alias adjustment, name becomes "span.duration" with FieldContextUnspecified
// "span.duration" is not in keysMap, so context stays unspecified
expectedOrder: []qbtypes.OrderBy{

View File

@@ -1,12 +1,10 @@
package telemetrytraces
import (
"time"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
func buildCompleteFieldKeyMap(releaseTime time.Time) map[string][]*telemetrytypes.TelemetryFieldKey {
func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{
"service.name": {
{
@@ -117,33 +115,7 @@ func buildCompleteFieldKeyMap(releaseTime time.Time) map[string][]*telemetrytype
for _, keys := range keysMap {
for _, key := range keys {
key.Signal = telemetrytypes.SignalTraces
if key.FieldContext == telemetrytypes.FieldContextResource {
key.Evolutions = mockEvolutionData(releaseTime)
}
}
}
return keysMap
}
// mockEvolutionData returns the canonical resource-column evolution timeline used in tests:
// the legacy resources_string map at epoch 0 and the JSON resource column released at releaseTime.
func mockEvolutionData(releaseTime time.Time) []*telemetrytypes.EvolutionEntry {
return []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalTraces,
ColumnName: "resources_string",
FieldContext: telemetrytypes.FieldContextResource,
ColumnType: "Map(LowCardinality(String), String)",
FieldName: "__all__",
ReleaseTime: time.Unix(0, 0),
},
{
Signal: telemetrytypes.SignalTraces,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
ReleaseTime: releaseTime,
},
}
}

View File

@@ -15,7 +15,6 @@ import (
)
func TestTraceOperatorStatementBuilder(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -68,7 +67,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name` FROM A_DIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name` FROM A_DIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
@@ -391,7 +390,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -444,7 +443,6 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
}
func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
operator qbtypes.QueryBuilderTraceOperator
@@ -508,7 +506,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)

View File

@@ -4,7 +4,6 @@ import (
"context"
"strings"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
@@ -17,13 +16,12 @@ import (
)
func TestTraceTimeRangeOptimization(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap["trace_id"] = []*telemetrytypes.TelemetryFieldKey{{
Name: "trace_id",
FieldContext: telemetrytypes.FieldContextSpan,

View File

@@ -0,0 +1,99 @@
package inframonitoringtypes
import (
"encoding/json"
"slices"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type Namespaces struct {
Type ResponseType `json:"type" required:"true"`
Records []NamespaceRecord `json:"records" required:"true"`
Total int `json:"total" required:"true"`
RequiredMetricsCheck RequiredMetricsCheck `json:"requiredMetricsCheck" required:"true"`
EndTimeBeforeRetention bool `json:"endTimeBeforeRetention" required:"true"`
Warning *qbtypes.QueryWarnData `json:"warning,omitempty"`
}
type NamespaceRecord struct {
NamespaceName string `json:"namespaceName" required:"true"`
NamespaceCPU float64 `json:"namespaceCPU" required:"true"`
NamespaceMemory float64 `json:"namespaceMemory" required:"true"`
PodCountsByPhase PodCountsByPhase `json:"podCountsByPhase" required:"true"`
Meta map[string]string `json:"meta" required:"true"`
}
// PostableNamespaces is the request body for the v2 namespaces list API.
type PostableNamespaces struct {
Start int64 `json:"start" required:"true"`
End int64 `json:"end" required:"true"`
Filter *qbtypes.Filter `json:"filter"`
GroupBy []qbtypes.GroupByKey `json:"groupBy"`
OrderBy *qbtypes.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit" required:"true"`
}
// Validate ensures PostableNamespaces contains acceptable values.
func (req *PostableNamespaces) Validate() error {
if req == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
}
if req.Start <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid start time %d: start must be greater than 0",
req.Start,
)
}
if req.End <= 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid end time %d: end must be greater than 0",
req.End,
)
}
if req.Start >= req.End {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid time range: start (%d) must be less than end (%d)",
req.Start,
req.End,
)
}
if req.Limit < 1 || req.Limit > 5000 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
}
if req.Offset < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "offset cannot be negative")
}
if req.OrderBy != nil {
if !slices.Contains(NamespacesValidOrderByKeys, req.OrderBy.Key.Name) {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by key: %s", req.OrderBy.Key.Name)
}
if req.OrderBy.Direction != qbtypes.OrderDirectionAsc && req.OrderBy.Direction != qbtypes.OrderDirectionDesc {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid order by direction: %s", req.OrderBy.Direction)
}
}
return nil
}
// UnmarshalJSON validates input immediately after decoding.
func (req *PostableNamespaces) UnmarshalJSON(data []byte) error {
type raw PostableNamespaces
var decoded raw
if err := json.Unmarshal(data, &decoded); err != nil {
return err
}
*req = PostableNamespaces(decoded)
return req.Validate()
}

View File

@@ -0,0 +1,11 @@
package inframonitoringtypes
const (
NamespacesOrderByCPU = "cpu"
NamespacesOrderByMemory = "memory"
)
var NamespacesValidOrderByKeys = []string{
NamespacesOrderByCPU,
NamespacesOrderByMemory,
}

View File

@@ -0,0 +1,237 @@
package inframonitoringtypes
import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestPostableNamespaces_Validate(t *testing.T) {
tests := []struct {
name string
req *PostableNamespaces
wantErr bool
}{
{
name: "valid request",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "nil request",
req: nil,
wantErr: true,
},
{
name: "start time zero",
req: &PostableNamespaces{
Start: 0,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time negative",
req: &PostableNamespaces{
Start: -1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "end time zero",
req: &PostableNamespaces{
Start: 1000,
End: 0,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time greater than end time",
req: &PostableNamespaces{
Start: 2000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "start time equal to end time",
req: &PostableNamespaces{
Start: 1000,
End: 1000,
Limit: 100,
Offset: 0,
},
wantErr: true,
},
{
name: "limit zero",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 0,
Offset: 0,
},
wantErr: true,
},
{
name: "limit negative",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: -10,
Offset: 0,
},
wantErr: true,
},
{
name: "limit exceeds max",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 5001,
Offset: 0,
},
wantErr: true,
},
{
name: "offset negative",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: -5,
},
wantErr: true,
},
{
name: "orderBy nil is valid",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
},
wantErr: false,
},
{
name: "orderBy with valid key cpu and direction asc",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: NamespacesOrderByCPU,
},
},
Direction: qbtypes.OrderDirectionAsc,
},
},
wantErr: false,
},
{
name: "orderBy with valid key memory and direction desc",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: NamespacesOrderByMemory,
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: false,
},
{
name: "orderBy with pod_phase key is rejected",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "pod_phase",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with invalid key",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "unknown",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
wantErr: true,
},
{
name: "orderBy with valid key but invalid direction",
req: &PostableNamespaces{
Start: 1000,
End: 2000,
Limit: 100,
Offset: 0,
OrderBy: &qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: NamespacesOrderByMemory,
},
},
Direction: qbtypes.OrderDirection{String: valuer.NewString("invalid")},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.req.Validate()
if tt.wantErr {
require.Error(t, err)
require.True(t, errors.Ast(err, errors.TypeInvalidInput), "expected error to be of type InvalidInput")
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -1,119 +0,0 @@
package querybuildertypesv5
import (
"slices"
"sort"
"strconv"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// SelectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
// Logic:
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
// - Rejects all evolutions before this latest base evolution
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
// - Results are sorted by ReleaseTime descending (newest first)
func SelectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
copy(sortedEvolutions, evolutions)
// sort the evolutions by ReleaseTime ascending
sort.Slice(sortedEvolutions, func(i, j int) bool {
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
})
tsStartTime := time.Unix(0, int64(tsStart))
tsEndTime := time.Unix(0, int64(tsEnd))
// Build evolution map: column name -> evolution
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
for _, evolution := range sortedEvolutions {
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
// since if there is duplicate we would just use the oldest one.
continue
}
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
}
// Find the latest base evolution (<= tsStartTime) across ALL columns
// Evolutions are sorted, so we can break early
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
for _, evolution := range sortedEvolutions {
if evolution.ReleaseTime.After(tsStartTime) {
break
}
latestBaseEvolutionAcrossAll = evolution
}
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
if latestBaseEvolutionAcrossAll == nil {
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
}
columnLookUpMap := make(map[string]*schema.Column)
for _, column := range columns {
columnLookUpMap[column.Name] = column
}
// Collect column-evolution pairs
type colEvoPair struct {
column *schema.Column
evolution *telemetrytypes.EvolutionEntry
}
pairs := []colEvoPair{}
for _, evolution := range evolutionMap {
// Reject evolutions before the latest base evolution
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
continue
}
// skip evolutions after tsEndTime
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
continue
}
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
}
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
}
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
if len(pairs) == 0 {
for _, column := range columns {
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
}
}
}
// Sort by ReleaseTime descending (newest first)
slices.SortFunc(pairs, func(a, b colEvoPair) int {
// Sort by ReleaseTime descending (newest first)
if a.evolution.ReleaseTime.After(b.evolution.ReleaseTime) {
return -1
}
if a.evolution.ReleaseTime.Before(b.evolution.ReleaseTime) {
return 1
}
return 0
})
// Extract results
newColumns := make([]*schema.Column, len(pairs))
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
for i, pair := range pairs {
newColumns[i] = pair.column
evolutionsEntries[i] = pair.evolution
}
return newColumns, evolutionsEntries, nil
}

View File

@@ -6,7 +6,7 @@ import uuid
from abc import ABC
from collections.abc import Callable, Generator
from enum import Enum
from typing import Any, Literal
from typing import Any
from urllib.parse import urlparse
import numpy as np
@@ -236,7 +236,6 @@ class Traces(ABC):
attributes_number: dict[str, np.float64]
attributes_bool: dict[str, bool]
resources_string: dict[str, str]
resource_json: dict[str, str]
events: list[str]
links: str
response_status_code: str
@@ -274,7 +273,6 @@ class Traces(ABC):
links: list[TracesLink] = [],
trace_state: str = "",
flags: np.uint32 = 0,
resource_write_mode: Literal["legacy_only", "dual_write"] = "dual_write",
) -> None:
if timestamp is None:
timestamp = datetime.datetime.now()
@@ -324,11 +322,8 @@ class Traces(ABC):
self.db_name = ""
self.db_operation = ""
# Process resources and derive service_name. Spans written before the
# JSON-resource evolution time only populate resources_string (legacy_only);
# spans at or after the evolution time dual-write to both columns.
# Process resources and derive service_name
self.resources_string = {k: str(v) for k, v in resources.items()}
self.resource_json = {} if resource_write_mode == "legacy_only" else dict(self.resources_string)
self.service_name = self.resources_string.get("service.name", "default-service")
for k, v in self.resources_string.items():
@@ -580,7 +575,7 @@ class Traces(ABC):
self.db_operation,
self.has_error,
self.is_remote,
self.resource_json,
self.resources_string,
],
dtype=object,
)

View File

@@ -1,240 +0,0 @@
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.querier import (
build_group_by_field,
build_logs_aggregation,
index_series_by_label,
make_query_request,
)
from fixtures.traces import TraceIdGenerator, Traces
# we already create the evolution for resource during schema migration
# since we have to create test data around it, we need to get the evolution time
def _get_traces_resource_evolution_time_json(signoz: types.SigNoz) -> datetime:
result = signoz.telemetrystore.conn.query(
"""
SELECT release_time
FROM signoz_metadata.distributed_column_evolution_metadata
WHERE signal = 'traces'
AND field_context = 'resource'
AND field_name = '__all__'
AND column_name = 'resource'
LIMIT 1
"""
).result_rows
assert result, "Expected traces resource evolution metadata to exist"
release_time_ns = int(result[0][0])
return datetime.fromtimestamp(release_time_ns / 1e9, tz=UTC)
# Spans with timestamps before the evolution time will have resources written only to resources_string.
# Spans with timestamps at or after the evolution time will have resources written to both resources_string and resource (JSON).
def _build_evolved_span(
timestamp: datetime,
evolution_time: datetime,
service_name: str,
name: str,
) -> Traces:
resource_write_mode = "legacy_only" if timestamp < evolution_time else "dual_write"
return Traces(
timestamp=timestamp,
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
name=name,
resources={
"service.name": service_name,
"deployment.environment": "integration",
},
resource_write_mode=resource_write_mode,
)
def _query_grouped_trace_series(
signoz: types.SigNoz,
token: str,
start: datetime,
end: datetime,
group_by: str = "service.name",
aggregation: str = "count()",
) -> dict[str, list[dict]]:
response = make_query_request(
signoz,
token,
start_ms=int(start.timestamp() * 1000),
end_ms=int(end.timestamp() * 1000),
request_type="time_series",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"groupBy": [build_group_by_field(group_by)],
"having": {"expression": ""},
"aggregations": [build_logs_aggregation(aggregation)],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
return index_series_by_label(aggregations[0]["series"], group_by)
def _assert_grouped_series(
series_by_group: dict[str, dict],
expected_values_by_group: dict[str, dict[int, int]],
) -> None:
assert set(series_by_group.keys()) == set(expected_values_by_group.keys())
for group_name, expected_by_ts in expected_values_by_group.items():
actual_values = sorted(
series_by_group[group_name]["values"],
key=lambda value: value["timestamp"],
)
expected_values = [{"timestamp": timestamp, "value": value} for timestamp, value in sorted(expected_by_ts.items())]
assert actual_values == expected_values
def _test_traces_resource_evolution(
signoz: types.SigNoz,
token: str,
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
# 1. Get the evolution time.
# 2. Ingest spans before the evolution time.
# 3. Ingest spans after the evolution time.
# 4. Query the spans before the evolution time.
# 5. Query the spans after the evolution time.
# Both aggregation and group by should be checked.
"""
evolution_time = _get_traces_resource_evolution_time_json(signoz)
evolution_time = evolution_time.replace(second=0, microsecond=0)
before_2 = evolution_time - timedelta(minutes=10)
before_1 = evolution_time - timedelta(minutes=5)
after_1 = evolution_time + timedelta(minutes=5)
after_2 = evolution_time + timedelta(minutes=10)
insert_traces(
[
_build_evolved_span(
timestamp=before_2,
evolution_time=evolution_time,
service_name="svc-before-2",
name="span before evolution 2",
),
_build_evolved_span(
timestamp=before_1,
evolution_time=evolution_time,
service_name="svc-before-1",
name="span before evolution 1",
),
_build_evolved_span(
timestamp=after_1,
evolution_time=evolution_time,
service_name="svc-after-1",
name="span after evolution 1",
),
_build_evolved_span(
timestamp=after_2,
evolution_time=evolution_time,
service_name="svc-after-2",
name="span after evolution 2",
),
]
)
before_series = _query_grouped_trace_series(signoz, token, before_2 - timedelta(minutes=1), before_1 + timedelta(minutes=1))
_assert_grouped_series(
before_series,
expected_values_by_group={
"svc-before-2": {
int(before_2.timestamp() * 1000): 1,
},
"svc-before-1": {
int(before_1.timestamp() * 1000): 1,
},
},
)
after_series = _query_grouped_trace_series(signoz, token, after_1 - timedelta(minutes=1), after_2 + timedelta(minutes=1))
_assert_grouped_series(
after_series,
expected_values_by_group={
"svc-after-1": {
int(after_1.timestamp() * 1000): 1,
},
"svc-after-2": {
int(after_2.timestamp() * 1000): 1,
},
},
)
spanning_series = _query_grouped_trace_series(signoz, token, before_2, after_2 + timedelta(minutes=1))
_assert_grouped_series(
spanning_series,
expected_values_by_group={
"svc-before-2": {
int(before_2.timestamp() * 1000): 1,
},
"svc-before-1": {
int(before_1.timestamp() * 1000): 1,
},
"svc-after-1": {
int(after_1.timestamp() * 1000): 1,
},
"svc-after-2": {
int(after_2.timestamp() * 1000): 1,
},
},
)
# query to check aggregation on the resource field like count_distinct(service.name)
aggregation_series = _query_grouped_trace_series(
signoz,
token,
before_2,
after_2 + timedelta(minutes=1),
group_by="deployment.environment",
aggregation="count_distinct(service.name)",
)
_assert_grouped_series(
aggregation_series,
expected_values_by_group={
"integration": {
int(before_2.timestamp() * 1000): 1,
int(before_1.timestamp() * 1000): 1,
int(after_1.timestamp() * 1000): 1,
int(after_2.timestamp() * 1000): 1,
},
},
)
def test_traces_resource_evolution(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
) -> None:
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
_test_traces_resource_evolution(signoz, token, insert_traces)