Compare commits

..

87 Commits

Author SHA1 Message Date
Piyush Singariya
65ce61c0dd Merge branch 'main' into feat/json-qb-tests 2026-04-21 15:33:59 +05:30
Piyush Singariya
1350dd8226 chore: some changes in test to improve quality 2026-04-21 15:33:45 +05:30
Piyush Singariya
14ecd0f8db chore: go mod tidy 2026-04-21 13:35:09 +05:30
Piyush Singariya
7eccbc96b6 chore: error fix and unused code 2026-04-21 13:33:18 +05:30
Piyush Singariya
0a595e1b71 chore: update migrator version to stable release 2026-04-21 13:30:12 +05:30
Piyush Singariya
e8ed7592e6 chore: update collector to stable release 2026-04-21 13:28:54 +05:30
Piyush Singariya
eee7788108 fix: replacing JSONIndex with TelemetryFieldKeyIndex 2026-04-21 13:25:06 +05:30
Piyush Singariya
0a37aa0bd1 Merge branch 'main' into feat/json-qb-tests 2026-04-21 10:59:26 +05:30
Piyush Singariya
7d7307a6eb Merge branch 'chore/json-changes' into feat/json-qb-tests 2026-04-16 11:50:14 +05:30
Piyush Singariya
91beeb4949 fix: int64 mapping 2026-04-16 11:49:40 +05:30
Piyush Singariya
9f8c3466b1 test: select orderby works 2026-04-16 11:49:15 +05:30
Piyush Singariya
701323c822 Merge branch 'chore/json-changes' into feat/json-qb-tests 2026-04-15 18:18:00 +05:30
Piyush Singariya
cc193c9be5 fix: issue with FuzzyMatching and API failing 2026-04-15 18:17:44 +05:30
Piyush Singariya
512dc579b1 ci: test with updated collector 2026-04-15 16:54:18 +05:30
Piyush Singariya
d904953afe Merge branch 'chore/json-changes' into feat/json-qb-tests 2026-04-15 16:08:58 +05:30
Piyush Singariya
9690c06cb2 fix: comment removed 2026-04-15 16:08:44 +05:30
Piyush Singariya
8c44925a67 fix: branches should tell missing array types 2026-04-15 16:08:14 +05:30
Piyush Singariya
1ea35a19d7 fix: update jsontypeexporter 2026-04-15 15:59:46 +05:30
Piyush Singariya
c27db82794 test: select order by tests added 2026-04-15 15:35:19 +05:30
Piyush Singariya
5d9685ad9e test: added indexed tests separately 2026-04-15 15:08:32 +05:30
Piyush Singariya
8d0cc2d99d Merge branch 'chore/json-changes' into feat/json-qb-tests 2026-04-15 14:52:49 +05:30
Piyush Singariya
9643fae27a fix: invalid index usage on terminal condition 2026-04-15 14:51:24 +05:30
Piyush Singariya
750394a73f Merge branch 'chore/json-changes' into feat/json-qb-tests 2026-04-15 14:18:53 +05:30
Piyush Singariya
19a65e80d8 Merge branch 'main' into chore/json-changes 2026-04-15 14:18:32 +05:30
Piyush Singariya
bbef04352e Merge branch 'chore/json-changes' into feat/json-qb-tests 2026-04-15 14:17:04 +05:30
Piyush Singariya
0758ff133b fix: json access plan remval of AvailableTypes 2026-04-15 14:16:23 +05:30
Piyush Singariya
74dceb844b test: integration test fix attempt 1 2026-04-15 13:54:30 +05:30
Piyush Singariya
0d0ebe8fe7 Merge branch 'main' into feat/json-qb-tests 2026-04-15 13:07:34 +05:30
Piyush Singariya
b35a6213a6 fix: remove unused promoted fixture 2026-04-15 13:05:52 +05:30
Piyush Singariya
9cef109158 fix: changes based on review 2026-04-15 13:00:14 +05:30
Piyush Singariya
bb308c263f chore: changes based on review 2026-04-15 11:30:29 +05:30
Piyush Singariya
67e1b82adb fix: name changed to field_map 2026-04-14 17:05:30 +05:30
Piyush Singariya
1e8c0f19f5 fix: added safeguards in plan generation 2026-04-14 16:17:44 +05:30
Piyush Singariya
c027181935 test: selectField tests added 2026-04-14 16:06:12 +05:30
Piyush Singariya
7122fb8b54 revert: remove db binaries 2026-04-14 14:56:46 +05:30
Piyush Singariya
67830c8a16 Merge branch 'main' into chore/json-changes 2026-04-14 14:38:53 +05:30
Piyush Singariya
096eee6435 Merge branch 'upgrade/collector@v0.144.3-rc.4' into chore/json-changes 2026-04-14 14:26:31 +05:30
Piyush Singariya
30f5f2f2f2 ci: test fix 2026-04-14 13:34:49 +05:30
Piyush Singariya
52adb84461 Merge branch 'upgrade/collector@v0.144.3-rc.4' into chore/json-changes 2026-04-14 13:20:29 +05:30
Piyush Singariya
63b7f15d0e fix: tests 2026-04-14 13:19:30 +05:30
Piyush Singariya
6d3c88ed21 chore: upgrade collector version v0.144.3-rc.4 2026-04-14 13:07:43 +05:30
Piyush Singariya
fc6a67aec1 Merge branch 'upgrade/collector@v0.144.3-rc.2' into chore/json-changes 2026-04-14 12:58:26 +05:30
Piyush Singariya
9f41499047 fix: go sum 2026-04-14 12:43:58 +05:30
Piyush Singariya
9cd554d293 fix: qbtoexpr tests 2026-04-14 12:43:07 +05:30
Piyush Singariya
415387edfc fix: upgraded collector version 2026-04-14 12:40:37 +05:30
Piyush Singariya
fa1bc3db9b ci: tests fixed 2026-04-14 11:49:35 +05:30
Piyush Singariya
6c6dad8a66 chore: remove redundent comparison 2026-04-13 17:18:16 +05:30
Piyush Singariya
7403f86773 fix: tons of changes 2026-04-13 13:44:57 +05:30
Piyush Singariya
9176ef0589 Merge branch 'main' into feat/json-qb-tests 2026-04-10 19:59:46 +05:30
Piyush Singariya
5c2a338189 test: updated validation 2026-04-10 19:59:29 +05:30
Piyush Singariya
704bab23cf Merge branch 'main' into feat/json-qb-tests 2026-04-09 18:04:25 +05:30
Piyush Singariya
371da26b3c fix: test 2026-04-09 18:03:57 +05:30
Piyush Singariya
97fbfbdc13 fix: type ambiguity 2026-04-09 13:58:13 +05:30
Piyush Singariya
4b112988ef Merge branch 'main' into feat/json-qb-tests 2026-04-09 12:19:26 +05:30
Piyush Singariya
a47ecf3907 test: with higher migrator version 2026-04-09 12:16:31 +05:30
Piyush Singariya
e4a78cf556 fix: dynamically change insert stmt for body_v2 availability 2026-04-08 22:00:05 +05:30
Piyush Singariya
b6adecc294 fix: better validations 2026-04-08 19:29:28 +05:30
Piyush Singariya
40333a5fee Merge branch 'main' into feat/json-qb-tests 2026-04-08 19:08:34 +05:30
Piyush Singariya
4af6a9abae Merge branch 'main' into feat/json-qb-tests 2026-04-07 21:49:21 +05:30
Piyush Singariya
55e892dad3 fix: body tests ready 2026-04-07 16:22:26 +05:30
Piyush Singariya
181116308f fix: logs.py 2026-04-07 15:56:21 +05:30
Piyush Singariya
eaa678910b fix: better tests 2026-04-07 14:49:52 +05:30
Piyush Singariya
e994caeb02 chore: import tests from older pr 2026-04-07 13:43:16 +05:30
Piyush Singariya
10840f8495 ci: lint changes 2026-04-07 12:31:22 +05:30
Piyush Singariya
1fcd3adfc8 Merge branch 'main' into fix/array-json 2026-04-07 12:29:35 +05:30
Piyush Singariya
3e14b26b00 fix: negative operator check 2026-04-07 12:26:44 +05:30
Piyush Singariya
b30bfa6371 fix: better review for test file 2026-04-02 12:53:44 +05:30
Piyush Singariya
e7f4a04b36 Merge branch 'main' into fix/array-json 2026-04-01 15:44:01 +05:30
Piyush Singariya
0687634da3 fix: stringified integer value input 2026-04-01 15:25:35 +05:30
Piyush Singariya
7e7732243e fix: dynamic array tests 2026-04-01 12:54:26 +05:30
Piyush Singariya
2f952e402f feat: change filtering of dynamic arrays 2026-04-01 12:09:32 +05:30
Piyush Singariya
a12febca4a fix: array json element comparison 2026-04-01 10:34:55 +05:30
Piyush Singariya
cb71c9c3f7 Merge branch 'main' into fix/array-json 2026-03-31 15:42:09 +05:30
Piyush Singariya
1cd4ce6509 Merge branch 'main' into fix/array-json 2026-03-31 14:55:36 +05:30
Piyush Singariya
9299c8ab18 fix: indexed unit tests 2026-03-30 15:47:47 +05:30
Piyush Singariya
24749de269 fix: comment 2026-03-30 15:16:28 +05:30
Piyush Singariya
39098ec3f4 fix: unit tests 2026-03-30 15:12:17 +05:30
Piyush Singariya
fe554f5c94 fix: remove not used paths from testdata 2026-03-30 14:24:48 +05:30
Piyush Singariya
8a60a041a6 fix: unit tests 2026-03-30 14:14:49 +05:30
Piyush Singariya
541f19c34a fix: array type filtering from dynamic arrays 2026-03-30 12:59:31 +05:30
Piyush Singariya
010db03d6e fix: indexed tests passing 2026-03-30 12:24:26 +05:30
Piyush Singariya
5408acbd8c fix: primitive conditions working 2026-03-30 12:01:35 +05:30
Piyush Singariya
0de6c85f81 feat: align negative operators to include other logs 2026-03-28 10:30:11 +05:30
Piyush Singariya
69ec24fa05 test: fix unit tests 2026-03-27 15:12:49 +05:30
Piyush Singariya
539d732b65 fix: contextual path index usage 2026-03-27 14:44:51 +05:30
Piyush Singariya
843d5fb199 Merge branch 'main' into feat/json-index 2026-03-27 14:17:52 +05:30
Piyush Singariya
fabdfb8cc1 feat: enable JSON Path index 2026-03-27 14:07:37 +05:30
27 changed files with 2482 additions and 4071 deletions

View File

@@ -52,6 +52,7 @@ jobs:
- ingestionkeys
- rootuser
- serviceaccount
- querier_json_body
sqlstore-provider:
- postgres
- sqlite
@@ -61,7 +62,7 @@ jobs:
- 25.5.6
- 25.12.5
schema-migrator-version:
- v0.142.0
- v0.144.3
postgres-version:
- 15
if: |

File diff suppressed because it is too large Load Diff

View File

@@ -4774,7 +4774,7 @@ export interface RuletypesPostableRuleDTO {
* @type string
*/
alert: string;
alertType: RuletypesAlertTypeDTO;
alertType?: RuletypesAlertTypeDTO;
/**
* @type object
*/
@@ -4899,7 +4899,7 @@ export interface RuletypesRuleDTO {
* @type string
*/
alert: string;
alertType: RuletypesAlertTypeDTO;
alertType?: RuletypesAlertTypeDTO;
/**
* @type object
*/
@@ -4984,8 +4984,8 @@ export interface RuletypesRuleConditionDTO {
*/
algorithm?: string;
compositeQuery: RuletypesAlertCompositeQueryDTO;
matchType?: RuletypesMatchTypeDTO;
op?: RuletypesCompareOperatorDTO;
matchType: RuletypesMatchTypeDTO;
op: RuletypesCompareOperatorDTO;
/**
* @type boolean
*/

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4
github.com/SigNoz/signoz-otel-collector v0.144.3
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/cespare/xxhash/v2 v2.3.0

4
go.sum
View File

@@ -108,8 +108,8 @@ github.com/SigNoz/expr v1.17.7-beta h1:FyZkleM5dTQ0O6muQfwGpoH5A2ohmN/XTasRCO72g
github.com/SigNoz/expr v1.17.7-beta/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4 h1:EskJkEMfuuIyArWhV8SleDV/fuKxiaEGTXrCZIFqDT4=
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
github.com/SigNoz/signoz-otel-collector v0.144.3 h1:/7PPIqIpPsaWtrgnfHam2XVYP41ZlgEKLHzQO8oVxcA=
github.com/SigNoz/signoz-otel-collector v0.144.3/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
github.com/Yiling-J/theine-go v0.6.2/go.mod h1:08QpMa5JZ2pKN+UJCRrCasWYO1IKCdl54Xa836rpmDU=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=

View File

@@ -44,7 +44,6 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
Description: "This endpoint creates a new alert rule",
Request: new(ruletypes.PostableRule),
RequestContentType: "application/json",
RequestExamples: postableRuleExamples(),
Response: new(ruletypes.Rule),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusCreated,
@@ -55,28 +54,27 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
}
if err := router.Handle("/api/v2/rules/{id}", handler.New(provider.authZ.EditAccess(provider.rulerHandler.UpdateRuleByID), handler.OpenAPIDef{
ID: "UpdateRuleByID",
Tags: []string{"rules"},
Summary: "Update alert rule",
Description: "This endpoint updates an alert rule by ID",
Request: new(ruletypes.PostableRule),
RequestContentType: "application/json",
RequestExamples: postableRuleExamples(),
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
ID: "UpdateRuleByID",
Tags: []string{"rules"},
Summary: "Update alert rule",
Description: "This endpoint updates an alert rule by ID",
Request: new(ruletypes.PostableRule),
RequestContentType: "application/json",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
})).Methods(http.MethodPut).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/rules/{id}", handler.New(provider.authZ.EditAccess(provider.rulerHandler.DeleteRuleByID), handler.OpenAPIDef{
ID: "DeleteRuleByID",
Tags: []string{"rules"},
Summary: "Delete alert rule",
Description: "This endpoint deletes an alert rule by ID",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
ID: "DeleteRuleByID",
Tags: []string{"rules"},
Summary: "Delete alert rule",
Description: "This endpoint deletes an alert rule by ID",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
})).Methods(http.MethodDelete).GetError(); err != nil {
return err
}
@@ -88,7 +86,6 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
Description: "This endpoint applies a partial update to an alert rule by ID",
Request: new(ruletypes.PostableRule),
RequestContentType: "application/json",
RequestExamples: postableRuleExamples(),
Response: new(ruletypes.Rule),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
@@ -105,7 +102,6 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
Description: "This endpoint fires a test notification for the given rule definition",
Request: new(ruletypes.PostableRule),
RequestContentType: "application/json",
RequestExamples: postableRuleExamples(),
Response: new(ruletypes.GettableTestRule),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
@@ -160,27 +156,27 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
}
if err := router.Handle("/api/v1/downtime_schedules/{id}", handler.New(provider.authZ.EditAccess(provider.rulerHandler.UpdateDowntimeScheduleByID), handler.OpenAPIDef{
ID: "UpdateDowntimeScheduleByID",
Tags: []string{"downtimeschedules"},
Summary: "Update downtime schedule",
Description: "This endpoint updates a downtime schedule by ID",
Request: new(ruletypes.PostablePlannedMaintenance),
RequestContentType: "application/json",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
ID: "UpdateDowntimeScheduleByID",
Tags: []string{"downtimeschedules"},
Summary: "Update downtime schedule",
Description: "This endpoint updates a downtime schedule by ID",
Request: new(ruletypes.PostablePlannedMaintenance),
RequestContentType: "application/json",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
})).Methods(http.MethodPut).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/downtime_schedules/{id}", handler.New(provider.authZ.EditAccess(provider.rulerHandler.DeleteDowntimeScheduleByID), handler.OpenAPIDef{
ID: "DeleteDowntimeScheduleByID",
Tags: []string{"downtimeschedules"},
Summary: "Delete downtime schedule",
Description: "This endpoint deletes a downtime schedule by ID",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
ID: "DeleteDowntimeScheduleByID",
Tags: []string{"downtimeschedules"},
Summary: "Delete downtime schedule",
Description: "This endpoint deletes a downtime schedule by ID",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
})).Methods(http.MethodDelete).GetError(); err != nil {
return err
}

View File

@@ -1,733 +0,0 @@
package signozapiserver
import "github.com/SigNoz/signoz/pkg/http/handler"
// postableRuleExamples returns example payloads attached to every rule-write
// endpoint. They cover each alert type, rule type, and composite-query shape.
func postableRuleExamples() []handler.OpenAPIExample {
rolling := func(evalWindow, frequency string) map[string]any {
return map[string]any{
"kind": "rolling",
"spec": map[string]any{"evalWindow": evalWindow, "frequency": frequency},
}
}
renotify := func(interval string, states ...string) map[string]any {
s := make([]any, 0, len(states))
for _, v := range states {
s = append(s, v)
}
return map[string]any{
"enabled": true,
"interval": interval,
"alertStates": s,
}
}
return []handler.OpenAPIExample{
{
Name: "metric_threshold_single",
Summary: "Metric threshold single builder query",
Description: "Fires when a pod consumes more than 80% of its requested CPU for the whole evaluation window. Uses `k8s.pod.cpu_request_utilization`.",
Value: map[string]any{
"alert": "Pod CPU above 80% of request",
"alertType": "METRIC_BASED_ALERT",
"description": "CPU usage for api-service pods exceeds 80% of the requested CPU",
"ruleType": "threshold_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"unit": "percentunit",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "metrics",
"stepInterval": 60,
"aggregations": []any{map[string]any{"metricName": "k8s.pod.cpu_request_utilization", "timeAggregation": "avg", "spaceAggregation": "max"}},
"filter": map[string]any{"expression": "k8s.deployment.name = 'api-service'"},
"groupBy": []any{
map[string]any{"name": "k8s.pod.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"},
},
"legend": "{{k8s.pod.name}} ({{deployment.environment}})",
},
},
},
},
"selectedQueryName": "A",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "critical",
"op": "above",
"matchType": "all_the_times",
"target": 0.8,
"channels": []any{"slack-platform", "pagerduty-oncall"},
},
},
},
},
"evaluation": rolling("15m", "1m"),
"notificationSettings": map[string]any{
"groupBy": []any{"k8s.pod.name", "deployment.environment"},
"renotify": renotify("4h", "firing"),
},
"labels": map[string]any{"severity": "critical", "team": "platform"},
"annotations": map[string]any{
"description": "Pod {{$k8s.pod.name}} CPU is at {{$value}} of request in {{$deployment.environment}}.",
"summary": "Pod CPU above {{$threshold}} of request",
},
},
},
{
Name: "metric_threshold_formula",
Summary: "Metric threshold multi-query formula",
Description: "Computes disk utilization as (1 - available/capacity) * 100 by combining two disabled base queries with a builder_formula. The formula emits 0100, so compositeQuery.unit is set to \"percent\" and the target is a bare number.",
Value: map[string]any{
"alert": "PersistentVolume above 80% utilization",
"alertType": "METRIC_BASED_ALERT",
"description": "Disk utilization for a persistent volume is above 80%",
"ruleType": "threshold_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"unit": "percent",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "metrics",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"metricName": "k8s.volume.available", "timeAggregation": "max", "spaceAggregation": "max"}},
"filter": map[string]any{"expression": "k8s.volume.type = 'persistentVolumeClaim'"},
"groupBy": []any{
map[string]any{"name": "k8s.persistentvolumeclaim.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "k8s.namespace.name", "fieldContext": "resource", "fieldDataType": "string"},
},
},
},
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "B",
"signal": "metrics",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"metricName": "k8s.volume.capacity", "timeAggregation": "max", "spaceAggregation": "max"}},
"filter": map[string]any{"expression": "k8s.volume.type = 'persistentVolumeClaim'"},
"groupBy": []any{
map[string]any{"name": "k8s.persistentvolumeclaim.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "k8s.namespace.name", "fieldContext": "resource", "fieldDataType": "string"},
},
},
},
map[string]any{
"type": "builder_formula",
"spec": map[string]any{
"name": "F1",
"expression": "(1 - A/B) * 100",
"legend": "{{k8s.persistentvolumeclaim.name}} ({{k8s.namespace.name}})",
},
},
},
},
"selectedQueryName": "F1",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "critical",
"op": "above",
"matchType": "at_least_once",
"target": 80,
"channels": []any{"slack-storage"},
},
},
},
},
"evaluation": rolling("30m", "5m"),
"notificationSettings": map[string]any{
"groupBy": []any{"k8s.namespace.name", "k8s.persistentvolumeclaim.name"},
"renotify": renotify("2h", "firing"),
},
"labels": map[string]any{"severity": "critical"},
"annotations": map[string]any{
"description": "Volume {{$k8s.persistentvolumeclaim.name}} in {{$k8s.namespace.name}} is {{$value}}% full.",
"summary": "Disk utilization above {{$threshold}}%",
},
},
},
{
Name: "metric_promql",
Summary: "Metric threshold PromQL rule",
Description: "PromQL expression instead of the builder. Dotted OTEL resource attributes are quoted (\"deployment.environment\"). Useful for queries that combine series with group_right or other Prom operators.",
Value: map[string]any{
"alert": "Kafka consumer group lag above 1000",
"alertType": "METRIC_BASED_ALERT",
"description": "Consumer group lag computed via PromQL",
"ruleType": "promql_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "promql",
"panelType": "graph",
"queries": []any{
map[string]any{
"type": "promql",
"spec": map[string]any{
"name": "A",
"query": "(max by(topic, partition, \"deployment.environment\")(kafka_log_end_offset) - on(topic, partition, \"deployment.environment\") group_right max by(group, topic, partition, \"deployment.environment\")(kafka_consumer_committed_offset)) > 0",
"legend": "{{topic}}/{{partition}} ({{group}})",
},
},
},
},
"selectedQueryName": "A",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "critical",
"op": "above",
"matchType": "all_the_times",
"target": 1000,
"channels": []any{"slack-data-platform", "pagerduty-data"},
},
},
},
},
"evaluation": rolling("10m", "1m"),
"notificationSettings": map[string]any{
"groupBy": []any{"group", "topic"},
"renotify": renotify("1h", "firing"),
},
"labels": map[string]any{"severity": "critical"},
"annotations": map[string]any{
"description": "Consumer group {{$group}} is {{$value}} messages behind on {{$topic}}/{{$partition}}.",
"summary": "Kafka consumer lag high",
},
},
},
{
Name: "metric_anomaly",
Summary: "Metric anomaly rule (v1 only)",
Description: "Anomaly rules are not yet supported under schemaVersion v2alpha1, so this example uses the v1 shape. Wraps a builder query in the `anomaly` function with daily seasonality SigNoz compares each point against the forecast for that time of day. Fires when the anomaly score stays below the threshold for the entire window; `requireMinPoints` guards against noisy intervals.",
Value: map[string]any{
"alert": "Anomalous drop in ingested spans",
"alertType": "METRIC_BASED_ALERT",
"description": "Detect an abrupt drop in span ingestion using a z-score anomaly function",
"ruleType": "anomaly_rule",
"version": "v5",
"evalWindow": "24h",
"frequency": "3h",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "metrics",
"stepInterval": 21600,
"aggregations": []any{map[string]any{"metricName": "otelcol_receiver_accepted_spans", "timeAggregation": "rate", "spaceAggregation": "sum"}},
"filter": map[string]any{"expression": "tenant_tier = 'premium'"},
"groupBy": []any{map[string]any{"name": "tenant_id", "fieldContext": "attribute", "fieldDataType": "string"}},
"functions": []any{
map[string]any{
"name": "anomaly",
"args": []any{map[string]any{"name": "z_score_threshold", "value": 2}},
},
},
"legend": "{{tenant_id}}",
},
},
},
},
"op": "below",
"matchType": "all_the_times",
"target": 2,
"algorithm": "standard",
"seasonality": "daily",
"selectedQueryName": "A",
"requireMinPoints": true,
"requiredNumPoints": 3,
},
"labels": map[string]any{"severity": "warning"},
"preferredChannels": []any{"slack-ingestion"},
"annotations": map[string]any{
"description": "Ingestion rate for tenant {{$tenant_id}} is anomalously low (z-score {{$value}}).",
"summary": "Span ingestion anomaly",
},
},
},
{
Name: "logs_threshold",
Summary: "Logs threshold count() over filter",
Description: "Counts matching log records (ERROR severity + body contains) over a rolling window. Fires at least once per evaluation when the count exceeds zero.",
Value: map[string]any{
"alert": "Payments service panic logs",
"alertType": "LOGS_BASED_ALERT",
"description": "Any panic log line emitted by the payments service",
"ruleType": "threshold_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "logs",
"stepInterval": 60,
"aggregations": []any{map[string]any{"expression": "count()"}},
"filter": map[string]any{"expression": "service.name = 'payments-api' AND severity_text = 'ERROR' AND body CONTAINS 'panic'"},
"groupBy": []any{
map[string]any{"name": "k8s.pod.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"},
},
"legend": "{{k8s.pod.name}} ({{deployment.environment}})",
},
},
},
},
"selectedQueryName": "A",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "critical",
"op": "above",
"matchType": "at_least_once",
"target": 0,
"channels": []any{"slack-payments", "pagerduty-payments"},
},
},
},
},
"evaluation": rolling("5m", "1m"),
"notificationSettings": map[string]any{
"groupBy": []any{"k8s.pod.name", "deployment.environment"},
"renotify": renotify("15m", "firing"),
},
"labels": map[string]any{"severity": "critical", "team": "payments"},
"annotations": map[string]any{
"description": "{{$k8s.pod.name}} emitted {{$value}} panic log(s) in {{$deployment.environment}}.",
"summary": "Payments service panic",
},
},
},
{
Name: "logs_error_rate_formula",
Summary: "Logs error rate error count / total count × 100",
Description: "Two disabled log count queries (A = errors, B = total) combined via a builder_formula into a percentage. Classic service-level error-rate alert pattern for log-based signals.",
Value: map[string]any{
"alert": "Payments-api error log rate above 1%",
"alertType": "LOGS_BASED_ALERT",
"description": "Error log ratio as a percentage of total logs for payments-api",
"ruleType": "threshold_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"unit": "percent",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"expression": "count()"}},
"filter": map[string]any{"expression": "service.name = 'payments-api' AND severity_text IN ['ERROR', 'FATAL']"},
"groupBy": []any{map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"}},
},
},
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "B",
"signal": "logs",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"expression": "count()"}},
"filter": map[string]any{"expression": "service.name = 'payments-api'"},
"groupBy": []any{map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"}},
},
},
map[string]any{
"type": "builder_formula",
"spec": map[string]any{
"name": "F1",
"expression": "(A / B) * 100",
"legend": "{{deployment.environment}}",
},
},
},
},
"selectedQueryName": "F1",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "critical",
"op": "above",
"matchType": "at_least_once",
"target": 1,
"channels": []any{"slack-payments"},
},
},
},
},
"evaluation": rolling("5m", "1m"),
"notificationSettings": map[string]any{
"groupBy": []any{"deployment.environment"},
"renotify": renotify("30m", "firing"),
},
"labels": map[string]any{"severity": "critical", "team": "payments"},
"annotations": map[string]any{
"description": "Error log rate in {{$deployment.environment}} is {{$value}}%",
"summary": "Payments-api error rate above {{$threshold}}%",
},
},
},
{
Name: "traces_threshold_latency",
Summary: "Traces threshold p99 latency (ns → s conversion)",
Description: "Builder query against the traces signal with p99(duration_nano). The series unit is ns (compositeQuery.unit), the target is in seconds (threshold.targetUnit) SigNoz converts before comparing. Canonical shape when series and target live in different units.",
Value: map[string]any{
"alert": "Search API p99 latency above 5s",
"alertType": "TRACES_BASED_ALERT",
"description": "p99 duration of the search endpoint exceeds 5s",
"ruleType": "threshold_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"unit": "ns",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "traces",
"stepInterval": 60,
"aggregations": []any{map[string]any{"expression": "p99(duration_nano)"}},
"filter": map[string]any{"expression": "service.name = 'search-api' AND name = 'GET /api/v1/search'"},
"groupBy": []any{
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "http.route", "fieldContext": "attribute", "fieldDataType": "string"},
},
"legend": "{{service.name}} {{http.route}}",
},
},
},
},
"selectedQueryName": "A",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "warning",
"op": "above",
"matchType": "at_least_once",
"target": 5,
"targetUnit": "s",
"channels": []any{"slack-search"},
},
},
},
},
"evaluation": rolling("5m", "1m"),
"notificationSettings": map[string]any{
"groupBy": []any{"service.name", "http.route"},
"renotify": renotify("30m", "firing"),
},
"labels": map[string]any{"severity": "warning", "team": "search"},
"annotations": map[string]any{
"description": "p99 latency for {{$service.name}} on {{$http.route}} crossed {{$threshold}}s.",
"summary": "Search-api latency degraded",
},
},
},
{
Name: "traces_error_rate_formula",
Summary: "Traces error rate error spans / total spans × 100",
Description: "Two disabled trace count queries (A = error spans where hasError=true, B = total spans) combined via a builder_formula into a percentage. Mirrors the common request-error-rate dashboard shape.",
Value: map[string]any{
"alert": "Search-api error rate above 5%",
"alertType": "TRACES_BASED_ALERT",
"description": "Request error rate for search-api, grouped by route",
"ruleType": "threshold_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"unit": "percent",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"expression": "count()"}},
"filter": map[string]any{"expression": "service.name = 'search-api' AND hasError = true"},
"groupBy": []any{
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "http.route", "fieldContext": "attribute", "fieldDataType": "string"},
},
},
},
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "B",
"signal": "traces",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"expression": "count()"}},
"filter": map[string]any{"expression": "service.name = 'search-api'"},
"groupBy": []any{
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "http.route", "fieldContext": "attribute", "fieldDataType": "string"},
},
},
},
map[string]any{
"type": "builder_formula",
"spec": map[string]any{
"name": "F1",
"expression": "(A / B) * 100",
"legend": "{{service.name}} {{http.route}}",
},
},
},
},
"selectedQueryName": "F1",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "critical",
"op": "above",
"matchType": "at_least_once",
"target": 5,
"channels": []any{"slack-search", "pagerduty-search"},
},
},
},
},
"evaluation": rolling("5m", "1m"),
"notificationSettings": map[string]any{
"groupBy": []any{"service.name", "http.route"},
"renotify": renotify("15m", "firing"),
},
"labels": map[string]any{"severity": "critical", "team": "search"},
"annotations": map[string]any{
"description": "Error rate on {{$service.name}} {{$http.route}} is {{$value}}%",
"summary": "Search-api error rate above {{$threshold}}%",
},
},
},
{
Name: "tiered_thresholds",
Summary: "Tiered thresholds with per-tier channels",
Description: "Two tiers (warning and critical) in a single rule, each with its own target, op, matchType, and channels so warnings and pages route to different receivers. `alertOnAbsent` + `absentFor` fires a no-data alert when the query returns no series for 15 consecutive evaluations.",
Value: map[string]any{
"alert": "Kafka consumer lag warn / critical",
"alertType": "METRIC_BASED_ALERT",
"description": "Warn at lag ≥ 50 and page at ≥ 200, tiered via thresholds.spec.",
"ruleType": "threshold_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "metrics",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"metricName": "kafka_log_end_offset", "timeAggregation": "max", "spaceAggregation": "max"}},
"filter": map[string]any{"expression": "topic != '__consumer_offsets'"},
"groupBy": []any{
map[string]any{"name": "topic", "fieldContext": "attribute", "fieldDataType": "string"},
map[string]any{"name": "partition", "fieldContext": "attribute", "fieldDataType": "string"},
},
},
},
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "B",
"signal": "metrics",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"metricName": "kafka_consumer_committed_offset", "timeAggregation": "max", "spaceAggregation": "max"}},
"filter": map[string]any{"expression": "topic != '__consumer_offsets'"},
"groupBy": []any{
map[string]any{"name": "topic", "fieldContext": "attribute", "fieldDataType": "string"},
map[string]any{"name": "partition", "fieldContext": "attribute", "fieldDataType": "string"},
},
},
},
map[string]any{
"type": "builder_formula",
"spec": map[string]any{
"name": "F1",
"expression": "A - B",
"legend": "{{topic}}/{{partition}}",
},
},
},
},
"alertOnAbsent": true,
"absentFor": 15,
"selectedQueryName": "F1",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "warning",
"op": "above",
"matchType": "all_the_times",
"target": 50,
"channels": []any{"slack-kafka-info"},
},
map[string]any{
"name": "critical",
"op": "above",
"matchType": "all_the_times",
"target": 200,
"channels": []any{"slack-kafka-alerts", "pagerduty-kafka"},
},
},
},
},
"evaluation": rolling("5m", "1m"),
"notificationSettings": map[string]any{
"groupBy": []any{"topic"},
"renotify": renotify("15m", "firing"),
},
"labels": map[string]any{"team": "data-platform"},
"annotations": map[string]any{
"description": "Consumer lag for {{$topic}} partition {{$partition}} is {{$value}}.",
"summary": "Kafka consumer lag",
},
},
},
{
Name: "notification_settings",
Summary: "Full notification settings (grouping, nodata renotify, grace period)",
Description: "Demonstrates the full notificationSettings surface: `groupBy` merges alerts across labels to cut noise, `newGroupEvalDelay` gives newly-appearing series a grace period before firing, `renotify` re-alerts every 30m while firing OR while the alert is in nodata (missing data is treated as actionable), and `usePolicy: false` means channels come from the threshold entries rather than global routing policies. Set `usePolicy: true` to skip per-threshold channels and route via the org-level notification policy instead.",
Value: map[string]any{
"alert": "API 5xx error rate above 1%",
"alertType": "TRACES_BASED_ALERT",
"description": "Noise-controlled 5xx error rate alert with renotify on gaps",
"ruleType": "threshold_rule",
"version": "v5",
"schemaVersion": "v2alpha1",
"condition": map[string]any{
"compositeQuery": map[string]any{
"queryType": "builder",
"panelType": "graph",
"unit": "percent",
"queries": []any{
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"expression": "count()"}},
"filter": map[string]any{"expression": "service.name CONTAINS 'api' AND http.status_code >= 500"},
"groupBy": []any{
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"},
},
},
},
map[string]any{
"type": "builder_query",
"spec": map[string]any{
"name": "B",
"signal": "traces",
"stepInterval": 60,
"disabled": true,
"aggregations": []any{map[string]any{"expression": "count()"}},
"filter": map[string]any{"expression": "service.name CONTAINS 'api'"},
"groupBy": []any{
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"},
},
},
},
map[string]any{
"type": "builder_formula",
"spec": map[string]any{
"name": "F1",
"expression": "(A / B) * 100",
"legend": "{{service.name}} ({{deployment.environment}})",
},
},
},
},
"selectedQueryName": "F1",
"thresholds": map[string]any{
"kind": "basic",
"spec": []any{
map[string]any{
"name": "critical",
"op": "above",
"matchType": "at_least_once",
"target": 1,
"channels": []any{"slack-api-alerts", "pagerduty-oncall"},
},
},
},
},
"evaluation": rolling("5m", "1m"),
"notificationSettings": map[string]any{
"groupBy": []any{"service.name", "deployment.environment"},
"newGroupEvalDelay": "2m",
"usePolicy": false,
"renotify": renotify("30m", "firing", "nodata"),
},
"labels": map[string]any{"team": "platform"},
"annotations": map[string]any{
"description": "{{$service.name}} 5xx rate in {{$deployment.environment}} is {{$value}}%.",
"summary": "API service error rate elevated",
},
},
},
}
}

View File

@@ -1,32 +0,0 @@
package signozapiserver
import (
"encoding/json"
"testing"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
)
// TestPostableRuleExamplesValidate verifies every example payload returned by
// postableRuleExamples() round-trips through PostableRule.UnmarshalJSON and
// passes Validate(). If an example drifts from the runtime contract this
// breaks loudly so the spec doesn't ship invalid payloads to users.
func TestPostableRuleExamplesValidate(t *testing.T) {
for _, example := range postableRuleExamples() {
t.Run(example.Name, func(t *testing.T) {
raw, err := json.Marshal(example.Value)
if err != nil {
t.Fatalf("marshal example: %v", err)
}
var rule ruletypes.PostableRule
if err := json.Unmarshal(raw, &rule); err != nil {
t.Fatalf("unmarshal: %v\npayload: %s", err, raw)
}
if err := rule.Validate(); err != nil {
t.Fatalf("Validate: %v\npayload: %s", err, raw)
}
})
}
}

View File

@@ -32,26 +32,16 @@ func NewModule(metadataStore telemetrytypes.MetadataStore, telemetrystore teleme
}
func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetypes.PromotePath, error) {
logsIndexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
indexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
if err != nil {
return nil, err
}
// Flatten the map values (which are slices) into a single slice
indexes := slices.Concat(slices.Collect(maps.Values(logsIndexes))...)
aggr := map[string][]promotetypes.WrappedIndex{}
for _, index := range indexes {
path, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
if err != nil {
return nil, err
}
// clean backticks from the path
path = strings.ReplaceAll(path, "`", "")
aggr[path] = append(aggr[path], promotetypes.WrappedIndex{
ColumnType: columnType,
Type: index.Type,
aggr[index.Name] = append(aggr[index.Name], promotetypes.WrappedIndex{
ColumnType: telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType].StringValue(),
Type: index.IndexType,
Granularity: index.Granularity,
})
}

View File

@@ -204,7 +204,7 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet
// Downstream query builder should handle multiple matching keys with their own metadata
// and not rely on this function to do so.
materialized := true
indexes := []telemetrytypes.JSONDataTypeIndex{}
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
fieldContextsSeen := map[telemetrytypes.FieldContext]bool{}
dataTypesSeen := map[telemetrytypes.FieldDataType]bool{}
for _, matchingKey := range matchingKeys {

View File

@@ -195,8 +195,8 @@ func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetryty
// the field genuinely holds the empty/zero value.
//
// Note: indexing is also skipped for Array Nested fields because they cannot be indexed.
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
return index.Type == node.TerminalConfig.ElemType
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.TelemetryFieldKeySkipIndex) bool {
return telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType] == node.TerminalConfig.ElemType
})
isExistsCheck := operator == qbtypes.FilterOperatorExists || operator == qbtypes.FilterOperatorNotExists
if node.TerminalConfig.ElemType.IndexSupported && indexed && !isExistsCheck {

View File

@@ -1127,9 +1127,12 @@ func buildTestTelemetryMetadataStore(t *testing.T, addIndexes bool) *telemetryty
return entry.Path == path && entry.Type == jsonType
})
if idx >= 0 {
key.Indexes = append(key.Indexes, telemetrytypes.JSONDataTypeIndex{
Type: jsonType,
ColumnExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
key.Indexes = append(key.Indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
Name: path,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: fdt,
BaseColumn: LogsV2BodyV2Column,
IndexExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
})
}
}

View File

@@ -106,7 +106,7 @@ func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFiel
return nil
}
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.JSONDataTypeIndex, error) {
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
filteredPaths := []string{}
for _, path := range paths {
// skip array paths; since they don't have any indexes
@@ -116,47 +116,22 @@ func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...st
filteredPaths = append(filteredPaths, path)
}
if len(filteredPaths) == 0 {
return make(map[string][]telemetrytypes.JSONDataTypeIndex), nil
return make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex), nil
}
// list indexes for the paths
indexesMap, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
indexes, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to list JSON path indexes")
}
// build a set of indexes
cleanIndexes := make(map[string][]telemetrytypes.JSONDataTypeIndex)
for path, indexes := range indexesMap {
for _, index := range indexes {
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", index.Expression)
}
jsonDataType, found := telemetrytypes.MappingStringToJSONDataType[columnType]
if !found {
t.logger.ErrorContext(ctx, "failed to map column type to JSON data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
continue
}
if jsonDataType == telemetrytypes.String {
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
Type: telemetrytypes.String,
ColumnExpression: columnExpr,
IndexExpression: index.Expression,
})
} else if strings.HasPrefix(index.Type, "minmax") {
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
Type: jsonDataType,
ColumnExpression: columnExpr,
IndexExpression: index.Expression,
})
}
}
fieldPathToIndexes := make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex)
for _, index := range indexes {
fieldPathToIndexes[index.Name] = append(fieldPathToIndexes[index.Name], index)
}
return cleanIndexes, nil
return fieldPathToIndexes, nil
}
func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, []any) {
@@ -180,7 +155,7 @@ func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, [
return sb.BuildWithFlavor(sqlbuilder.ClickHouse)
}
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
ctx = withTelemetryContext(ctx, "ListLogsJSONIndexes")
query, args := buildListLogsJSONIndexesQuery(t.telemetrystore.Cluster(), filters...)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
@@ -189,7 +164,7 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
}
defer rows.Close()
indexes := make(map[string][]schemamigrator.Index)
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
for rows.Next() {
var name string
var typeFull string
@@ -198,11 +173,42 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
if err := rows.Scan(&name, &typeFull, &expr, &granularity); err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to scan string indexed column")
}
indexes[name] = append(indexes[name], schemamigrator.Index{
Name: name,
Type: typeFull,
Expression: expr,
Granularity: int(granularity),
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(expr)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", expr)
}
fdt, found := telemetrytypes.MappingJSONDataTypeToFieldDataType[columnType]
if !found {
t.logger.ErrorContext(ctx, "failed to map JSON data type to field data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
continue
}
// Key by the field path (e.g. "user.name"), not by the ClickHouse index name.
// columnExpr is the full column reference like "body_v2.user.name"; strip the
// column prefix and any backticks added for special-character path segments.
baseColumn := ""
fieldName := ""
switch {
case strings.HasPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix):
baseColumn = telemetrylogs.BodyV2ColumnPrefix
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix)
case strings.HasPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix):
baseColumn = telemetrylogs.BodyPromotedColumnPrefix
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix)
}
fieldName = strings.ReplaceAll(fieldName, "`", "")
indexes = append(indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
Name: fieldName,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: fdt,
BaseColumn: baseColumn,
IndexName: name,
IndexType: typeFull,
IndexExpression: expr,
Granularity: int(granularity),
})
}

View File

@@ -60,7 +60,7 @@ func (i *PromotePath) ValidateAndSetDefaults() error {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index granularity must be greater than 0")
}
jsonDataType, ok := telemetrytypes.MappingStringToJSONDataType[index.ColumnType]
jsonDataType, ok := telemetrytypes.MappingFieldDataTypeToJSONDataType[telemetrytypes.MappingJSONDataTypeToFieldDataType[index.ColumnType]]
if !ok {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid column type: %s", index.ColumnType)
}

View File

@@ -114,11 +114,11 @@ type AlertCompositeQuery struct {
type RuleCondition struct {
CompositeQuery *AlertCompositeQuery `json:"compositeQuery" required:"true"`
CompareOperator CompareOperator `json:"op,omitzero"`
CompareOperator CompareOperator `json:"op" required:"true"`
Target *float64 `json:"target,omitempty"`
AlertOnAbsent bool `json:"alertOnAbsent,omitempty"`
AbsentFor uint64 `json:"absentFor,omitempty"`
MatchType MatchType `json:"matchType,omitzero"`
MatchType MatchType `json:"matchType" required:"true"`
TargetUnit string `json:"targetUnit,omitempty"`
Algorithm string `json:"algorithm,omitempty"`
Seasonality Seasonality `json:"seasonality,omitzero"`

View File

@@ -50,13 +50,13 @@ const (
// PostableRule is used to create alerting rule from HTTP api.
type PostableRule struct {
AlertName string `json:"alert" required:"true"`
AlertType AlertType `json:"alertType" required:"true"`
AlertType AlertType `json:"alertType,omitempty"`
Description string `json:"description,omitempty"`
RuleType RuleType `json:"ruleType" required:"true"`
RuleType RuleType `json:"ruleType,omitzero" required:"true"`
EvalWindow valuer.TextDuration `json:"evalWindow,omitzero"`
Frequency valuer.TextDuration `json:"frequency,omitzero"`
RuleCondition *RuleCondition `json:"condition" required:"true"`
RuleCondition *RuleCondition `json:"condition,omitempty" required:"true"`
Labels map[string]string `json:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
@@ -67,9 +67,9 @@ type PostableRule struct {
PreferredChannels []string `json:"preferredChannels,omitempty"`
Version string `json:"version"`
Version string `json:"version,omitempty"`
Evaluation *EvaluationEnvelope `json:"evaluation,omitempty"`
Evaluation *EvaluationEnvelope `yaml:"evaluation,omitempty" json:"evaluation,omitempty"`
SchemaVersion string `json:"schemaVersion,omitempty"`
NotificationSettings *NotificationSettings `json:"notificationSettings,omitempty"`

View File

@@ -37,9 +37,9 @@ type TelemetryFieldKey struct {
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
JSONPlan JSONAccessPlan `json:"-"`
Indexes []JSONDataTypeIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
JSONPlan JSONAccessPlan `json:"-"`
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
Evolutions []*EvolutionEntry `json:"-"`
}
@@ -102,7 +102,7 @@ func (f TelemetryFieldKey) String() string {
if i > 0 {
sb.WriteString("; ")
}
fmt.Fprintf(&sb, "{type=%s, columnExpr=%s, indexExpr=%s}", index.Type.StringValue(), index.ColumnExpression, index.IndexExpression)
fmt.Fprintf(&sb, "{type=%s, indexExpr=%s}", MappingFieldDataTypeToJSONDataType[index.FieldDataType].StringValue(), index.IndexExpression)
}
sb.WriteString("]")
}
@@ -400,3 +400,14 @@ func NewFieldValueSelectorFromPostableFieldValueParams(params PostableFieldValue
return fieldValueSelector
}
type TelemetryFieldKeySkipIndex struct {
Name string `json:"name"` // Name is TelemetryFieldKey.Name not IndexName from ClickHouse
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
BaseColumn string `json:"baseColumn"`
IndexName string `json:"indexName"`
IndexType string `json:"indexType"`
IndexExpression string `json:"indexExpression"`
Granularity int `json:"granularity"`
}

View File

@@ -1,11 +1,5 @@
package telemetrytypes
type JSONDataTypeIndex struct {
Type JSONDataType
ColumnExpression string
IndexExpression string
}
type JSONDataType struct {
str string // Store the correct case for ClickHouse
IsArray bool
@@ -32,18 +26,17 @@ var (
ArrayJSON = JSONDataType{"Array(JSON)", true, "JSON", false}
)
var MappingStringToJSONDataType = map[string]JSONDataType{
"String": String,
"Int64": Int64,
"Float64": Float64,
"Bool": Bool,
"Dynamic": Dynamic,
"Array(Nullable(String))": ArrayString,
"Array(Nullable(Int64))": ArrayInt64,
"Array(Nullable(Float64))": ArrayFloat64,
"Array(Nullable(Bool))": ArrayBool,
"Array(Dynamic)": ArrayDynamic,
"Array(JSON)": ArrayJSON,
var MappingJSONDataTypeToFieldDataType = map[string]FieldDataType{
"String": FieldDataTypeString,
"Int64": FieldDataTypeInt64,
"Float64": FieldDataTypeFloat64,
"Bool": FieldDataTypeBool,
"Array(Nullable(String))": FieldDataTypeArrayString,
"Array(Nullable(Int64))": FieldDataTypeArrayInt64,
"Array(Nullable(Float64))": FieldDataTypeArrayFloat64,
"Array(Nullable(Bool))": FieldDataTypeArrayBool,
"Array(Dynamic)": FieldDataTypeArrayDynamic,
"Array(JSON)": FieldDataTypeArrayJSON,
}
var MappingFieldDataTypeToJSONDataType = map[FieldDataType]JSONDataType{

View File

@@ -3,7 +3,6 @@ package telemetrytypes
import (
"context"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
@@ -35,7 +34,7 @@ type MetadataStore interface {
FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error)
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error)
ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]TelemetryFieldKeySkipIndex, error)
// ListPromotedPaths lists the promoted paths.
GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error)

View File

@@ -4,7 +4,6 @@ import (
"context"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -18,7 +17,7 @@ type MockMetadataStore struct {
TemporalityMap map[string]metrictypes.Temporality
TypeMap map[string]metrictypes.Type
PromotedPathsMap map[string]bool
LogsJSONIndexesMap map[string][]schemamigrator.Index
LogsJSONIndexes []telemetrytypes.TelemetryFieldKeySkipIndex
ColumnEvolutionMetadataMap map[string][]*telemetrytypes.EvolutionEntry
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
// StaticFields holds signal-specific intrinsic field definitions (e.g. telemetrylogs.IntrinsicFields).
@@ -34,7 +33,7 @@ func NewMockMetadataStore() *MockMetadataStore {
TemporalityMap: make(map[string]metrictypes.Temporality),
TypeMap: make(map[string]metrictypes.Type),
PromotedPathsMap: make(map[string]bool),
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
LogsJSONIndexes: []telemetrytypes.TelemetryFieldKeySkipIndex{},
ColumnEvolutionMetadataMap: make(map[string][]*telemetrytypes.EvolutionEntry),
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
StaticFields: make(map[string]telemetrytypes.TelemetryFieldKey),
@@ -369,8 +368,8 @@ func (m *MockMetadataStore) GetPromotedPaths(ctx context.Context, paths ...strin
}
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
return m.LogsJSONIndexesMap, nil
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
return m.LogsJSONIndexes, nil
}
func (m *MockMetadataStore) updateColumnEvolutionMetadataForKeys(_ context.Context, keysToUpdate []*telemetrytypes.TelemetryFieldKey) map[string][]*telemetrytypes.EvolutionEntry {

View File

@@ -23,6 +23,7 @@ pytest_plugins = [
"fixtures.notification_channel",
"fixtures.alerts",
"fixtures.cloudintegrations",
"fixtures.jsontypeexporter",
]
@@ -78,6 +79,6 @@ def pytest_addoption(parser: pytest.Parser):
parser.addoption(
"--schema-migrator-version",
action="store",
default="v0.144.2",
default="v0.144.3",
help="schema migrator version",
)

View File

@@ -0,0 +1,473 @@
"""
Simpler version of jsontypeexporter for test fixtures.
This exports JSON type metadata to the path_types table by parsing JSON bodies
and extracting all paths with their types, similar to how the real jsontypeexporter works.
"""
import datetime
import json
from abc import ABC
from http import HTTPStatus
from typing import (
Any,
Callable,
Dict,
Generator,
List,
Optional,
Set,
Union,
)
import numpy as np
import pytest
import requests
from fixtures import types
class JSONPathType(ABC):
"""Represents a JSON path with its type information"""
field_name: str
field_data_type: str
last_seen: np.uint64
signal: str = "logs"
field_context: str = "body"
def __init__(
self,
field_name: str,
field_data_type: str,
last_seen: Optional[datetime.datetime] = None,
) -> None:
self.field_name = field_name
self.field_data_type = field_data_type
self.signal = "logs"
self.field_context = "body"
if last_seen is None:
last_seen = datetime.datetime.now()
self.last_seen = np.uint64(int(last_seen.timestamp() * 1e9))
def np_arr(self) -> np.array:
"""Return path type data as numpy array for database insertion"""
return np.array([self.signal, self.field_context, self.field_name, self.field_data_type, self.last_seen])
# Constants matching jsontypeexporter
ARRAY_SEPARATOR = "[]." # Used in paths like "education[].name"
ARRAY_SUFFIX = "[]" # Used when traversing into array element objects
def _infer_array_type_from_type_strings(types: List[str]) -> Optional[str]:
"""
Infer array type from a list of pre-classified type strings.
Matches metadataexporter's inferArrayMask logic.
Internal type strings are: "JSON", "String", "Bool", "Float64", "Int64"
SuperTyping rules (matching Go inferArrayMask):
- JSON alone → []json
- JSON + any primitive → []dynamic
- String alone → []string; String + other → []dynamic
- Float64 wins over Int64 and Bool
- Int64 wins over Bool
- Bool alone → []bool
"""
if len(types) == 0:
return None
unique = set(types)
has_json = "JSON" in unique
# hasPrimitive mirrors Go: (hasJSON && len(unique) > 1) || (!hasJSON && len(unique) > 0)
has_primitive = (has_json and len(unique) > 1) or (not has_json and len(unique) > 0)
if has_json:
if not has_primitive:
return "[]json"
return "[]dynamic"
# ---- Primitive Type Resolution (Float > Int > Bool) ----
if "String" in unique:
if len(unique) > 1:
return "[]dynamic"
return "[]string"
if "Float64" in unique:
return "[]float64"
if "Int64" in unique:
return "[]int64"
if "Bool" in unique:
return "[]bool"
return "[]dynamic"
def _infer_array_type(elements: List[Any]) -> Optional[str]:
"""
Infer array type from raw Python list elements.
Classifies each element then delegates to _infer_array_type_from_type_strings.
"""
if len(elements) == 0:
return None
types = []
for elem in elements:
if elem is None:
continue
if isinstance(elem, dict):
types.append("JSON")
elif isinstance(elem, str):
types.append("String")
elif isinstance(elem, bool): # must be before int (bool is subclass of int)
types.append("Bool")
elif isinstance(elem, float):
types.append("Float64")
elif isinstance(elem, int):
types.append("Int64")
return _infer_array_type_from_type_strings(types)
def _python_type_to_clickhouse_type(value: Any) -> str:
"""
Convert Python type to ClickHouse JSON type string.
Maps Python types to ClickHouse JSON data types.
Matches metadataexporter's mapPCommonValueTypeToDataType.
"""
if isinstance(value, bool):
return "bool"
elif isinstance(value, int):
return "int64"
elif isinstance(value, float):
return "float64"
elif isinstance(value, str):
return "string"
elif isinstance(value, list):
# Use the sophisticated array type inference
array_type = _infer_array_type(value)
return array_type if array_type else "[]dynamic"
elif isinstance(value, dict):
return "json"
else:
return "string" # Default fallback
def _extract_json_paths(
obj: Any,
current_path: str = "",
path_types: Optional[Dict[str, Set[str]]] = None,
level: int = 0,
) -> Dict[str, Set[str]]:
"""
Recursively extract all paths and their types from a JSON object.
Matches jsontypeexporter's analyzePValue logic.
Args:
obj: The JSON object to traverse
current_path: Current path being built (e.g., "user.name")
path_types: Dictionary mapping paths to sets of types found
level: Current nesting level (for depth limiting)
Returns:
Dictionary mapping paths to sets of type strings
"""
if path_types is None:
path_types = {}
if obj is None:
# Skip null values — matches Go walkNode which errors on ValueTypeEmpty
return path_types
if isinstance(obj, dict):
# For objects, recurse into keys without recording the object itself as a type.
# Matches Go walkMap which recurses without calling ta.record on the map node.
for key, value in obj.items():
# Build the path for this key
if current_path:
new_path = f"{current_path}.{key}"
else:
new_path = key
# Recurse into the value
_extract_json_paths(value, new_path, path_types, level + 1)
elif isinstance(obj, list):
# Skip empty arrays
if len(obj) == 0:
return path_types
# Collect types from array elements (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
types = []
for item in obj:
if isinstance(item, dict):
# When traversing into array element objects, use ArraySuffix ([])
# This matches: prefix+ArraySuffix in the Go code
# Example: if current_path is "education", we use "education[]" to traverse into objects
array_prefix = current_path + ARRAY_SUFFIX if current_path else ""
for key, value in item.items():
if array_prefix:
# Use array separator: education[].name
array_path = f"{array_prefix}.{key}"
else:
array_path = key
# Recurse without increasing level (matching Go behavior)
_extract_json_paths(value, array_path, path_types, level)
types.append("JSON")
elif isinstance(item, list):
# Arrays inside arrays are not supported - skip the whole path
# Matching Go: e.logger.Error("arrays inside arrays are not supported!", ...); return nil
return path_types
elif isinstance(item, str):
types.append("String")
elif isinstance(item, bool):
types.append("Bool")
elif isinstance(item, float):
types.append("Float64")
elif isinstance(item, int):
types.append("Int64")
# Infer array type from collected types (matching Go: if mask := inferArrayMask(types); mask != 0)
if len(types) > 0:
array_type = _infer_array_type_from_type_strings(types)
if array_type and current_path:
if current_path not in path_types:
path_types[current_path] = set()
path_types[current_path].add(array_type)
else:
# Primitive value (string, number, bool)
if current_path:
if current_path not in path_types:
path_types[current_path] = set()
obj_type = _python_type_to_clickhouse_type(obj)
path_types[current_path].add(obj_type)
return path_types
def _parse_json_bodies_and_extract_paths(
json_bodies: List[str],
timestamp: Optional[datetime.datetime] = None,
) -> List[JSONPathType]:
"""
Parse JSON bodies and extract all paths with their types.
This mimics the behavior of jsontypeexporter.
Args:
json_bodies: List of JSON body strings to parse
timestamp: Timestamp to use for last_seen (defaults to now)
Returns:
List of JSONPathType objects with all discovered paths and types
"""
if timestamp is None:
timestamp = datetime.datetime.now()
# Aggregate all paths and their types across all JSON bodies
all_path_types: Dict[str, Set[str]] = {}
for json_body in json_bodies:
try:
parsed = json.loads(json_body)
_extract_json_paths(parsed, "", all_path_types, level=0)
except (json.JSONDecodeError, TypeError):
# Skip invalid JSON
continue
# Convert to list of JSONPathType objects
# Each path can have multiple types, so we create one JSONPathType per type
path_type_objects: List[JSONPathType] = []
for path, types_set in all_path_types.items():
for type_str in types_set:
path_type_objects.append(
JSONPathType(field_name=path, field_data_type=type_str, last_seen=timestamp)
)
return path_type_objects
@pytest.fixture(name="export_json_types", scope="function")
def export_json_types(
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest, # To access migrator fixture
) -> Generator[
Callable[[Union[List[JSONPathType], List[str], List[Any]]], None], Any, None
]:
"""
Fixture for exporting JSON type metadata to the path_types table.
This is a simpler version of jsontypeexporter for test fixtures.
The function can accept:
1. List of JSONPathType objects (manual specification)
2. List of JSON body strings (auto-extract paths)
3. List of Logs objects (extract from body_json field)
Usage examples:
# Manual specification
export_json_types([
JSONPathType(field_name="user.name", field_data_type="string"),
JSONPathType(field_name="user.age", field_data_type="int64"),
])
# Auto-extract from JSON strings
export_json_types([
'{"user": {"name": "alice", "age": 25}}',
'{"user": {"name": "bob", "age": 30}}',
])
# Auto-extract from Logs objects
export_json_types(logs_list)
"""
# Ensure migrator has run to create the table
try:
request.getfixturevalue("migrator")
except Exception:
# If migrator fixture is not available, that's okay - table might already exist
pass
def _export_json_types(
data: Union[
List[JSONPathType], List[str], List[Any]
], # List[Logs] but avoiding circular import
) -> None:
"""
Export JSON type metadata to signoz_metadata.distributed_field_keys table.
This table stores signal, context, path, and type information for body JSON fields.
"""
path_types: List[JSONPathType] = []
if len(data) == 0:
return
# Determine input type and convert to JSONPathType list
first_item = data[0]
if isinstance(first_item, JSONPathType):
# Already JSONPathType objects
path_types = data # type: ignore
elif isinstance(first_item, str):
# List of JSON strings - parse and extract paths
path_types = _parse_json_bodies_and_extract_paths(data) # type: ignore
else:
# Assume it's a list of Logs objects - extract body_v2
json_bodies: List[str] = []
for log in data: # type: ignore
# Try to get body_v2 attribute
if hasattr(log, "body_v2") and log.body_v2:
json_bodies.append(log.body_v2)
elif hasattr(log, "body") and log.body:
# Fallback to body if body_v2 not available
try:
# Try to parse as JSON
json.loads(log.body)
json_bodies.append(log.body)
except (json.JSONDecodeError, TypeError):
pass
if json_bodies:
path_types = _parse_json_bodies_and_extract_paths(json_bodies)
if len(path_types) == 0:
return
clickhouse.conn.insert(
database="signoz_metadata",
table="distributed_field_keys",
data=[path_type.np_arr() for path_type in path_types],
column_names=[
"signal",
"field_context",
"field_name",
"field_data_type",
"last_seen",
],
)
yield _export_json_types
# Cleanup - truncate the local table after tests (following pattern from logs fixture)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_metadata.field_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
@pytest.fixture(name="create_json_index", scope="function")
def create_json_index(
signoz: types.SigNoz,
) -> Generator[Callable[[str, List[Dict[str, Any]]], None], None, None]:
"""
Create ClickHouse data-skipping indexes on body_v2 JSON sub-columns via
POST /api/v1/logs/promote_paths.
**Must be called BEFORE insert_logs** so that newly inserted data parts are
covered by the index and the QB uses the indexed condition path.
Each entry in `paths` follows the PromotePath API shape:
{
"path": "body.user.name", # must start with "body."
"indexes": [
{
"column_type": "String", # String | Int64 | Float64
"type": "ngrambf_v1(3, 256, 2, 0)", # or "minmax", "tokenbf_v1(...)"
"granularity": 1,
}
],
}
Teardown drops every index created during the test by querying
system.data_skipping_indices for matching expressions.
Example::
def test_foo(signoz, get_token, insert_logs, export_json_types, create_json_body_index):
token = get_token(...)
export_json_types(logs_list)
create_json_body_index(token, [
{"path": "body.user.name",
"indexes": [{"column_type": "String", "type": "ngrambf_v1(3, 256, 2, 0)", "granularity": 1}]},
{"path": "body.user.age",
"indexes": [{"column_type": "Int64", "type": "minmax", "granularity": 1}]},
])
insert_logs(logs_list) # data inserted after index exists — index is built automatically
"""
created_paths: List[str] = []
def _create_json_body_index(token: str, paths: List[Dict[str, Any]]) -> None:
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/logs/promote_paths"),
headers={"authorization": f"Bearer {token}"},
json=paths,
timeout=30,
)
assert response.status_code == HTTPStatus.CREATED, (
f"Failed to create JSON body indexes: "
f"{response.status_code} {response.text}"
)
for path in paths:
# The API strips the "body." prefix before storing — mirror that here
# so our cleanup query uses the bare path (e.g. "user.name").
raw = path["path"].removeprefix("body.")
if raw not in created_paths:
created_paths.append(raw)
yield _create_json_body_index
if not created_paths:
return
cluster = signoz.telemetrystore.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER"]
for path in created_paths:
result = signoz.telemetrystore.conn.query(
"SELECT name FROM system.data_skipping_indices "
"WHERE database = 'signoz_logs' AND table = 'logs_v2' "
f"AND expr LIKE '%{path}%'"
)
for (index_name,) in result.result_rows:
signoz.telemetrystore.conn.query(
f"ALTER TABLE signoz_logs.logs_v2 "
f"ON CLUSTER '{cluster}' "
f"DROP INDEX IF EXISTS `{index_name}`"
)

View File

@@ -122,6 +122,8 @@ class Logs(ABC):
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "default body",
body_v2: Optional[str] = None,
body_promoted: Optional[str] = None,
severity_text: str = "INFO",
trace_id: str = "",
span_id: str = "",
@@ -167,6 +169,33 @@ class Logs(ABC):
# Set body
self.body = body
# Set body_v2 - if body is JSON, parse and stringify it, otherwise use empty string
# ClickHouse accepts String input for JSON column
if body_v2 is not None:
self.body_v2 = body_v2
else:
# Try to parse body as JSON; if successful use it directly,
# otherwise wrap as {"message": body} matching the normalize operator behavior.
try:
json.loads(body)
self.body_v2 = body
except (json.JSONDecodeError, TypeError):
self.body_v2 = json.dumps({"message": body})
# Set body_promoted - must be valid JSON
# Tests will explicitly pass promoted column's content, but we validate it
if body_promoted is not None:
# Validate that it's valid JSON
try:
json.loads(body_promoted)
self.body_promoted = body_promoted
except (json.JSONDecodeError, TypeError):
# If invalid, default to empty JSON object
self.body_promoted = "{}"
else:
# Default to empty JSON object (valid JSON)
self.body_promoted = "{}"
# Process resources and attributes
self.resources_string = {k: str(v) for k, v in resources.items()}
self.resource_json = (
@@ -326,6 +355,8 @@ class Logs(ABC):
self.severity_text,
self.severity_number,
self.body,
self.body_v2,
self.body_promoted,
self.attributes_string,
self.attributes_number,
self.attributes_bool,
@@ -454,31 +485,53 @@ def insert_logs(
data=[resource_key.np_arr() for resource_key in resource_keys],
)
# All columns in insertion order (must match Logs.np_arr() order)
all_column_names = [
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"body_v2",
"body_promoted",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
]
# Check if body_v2 column exists (only present when ENABLE_LOGS_MIGRATIONS_V2 migration has run)
result = clickhouse.conn.query(
"SELECT count() FROM system.columns WHERE database = 'signoz_logs' AND table = 'logs_v2' AND name = 'body_v2'"
)
has_json_body = result.result_rows[0][0] > 0
if has_json_body:
column_names = all_column_names
data = [log.np_arr() for log in logs]
else:
json_body_cols = {"body_v2", "body_promoted"}
keep_indices = [
i for i, c in enumerate(all_column_names) if c not in json_body_cols
]
column_names = [all_column_names[i] for i in keep_indices]
data = [log.np_arr()[keep_indices] for log in logs]
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_v2",
data=[log.np_arr() for log in logs],
column_names=[
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
],
data=data,
column_names=column_names,
)
yield _insert_logs

View File

@@ -1,3 +1,5 @@
from typing import Optional
import docker
import pytest
from testcontainers.core.container import Network
@@ -8,27 +10,32 @@ from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@pytest.fixture(name="migrator", scope="package")
def migrator(
def create_migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
cache_key: str = "migrator",
env_overrides: Optional[dict] = None,
) -> types.Operation:
"""
Package-scoped fixture for running schema migrations.
Factory function for running schema migrations.
Accepts optional env_overrides to customize the migrator environment.
"""
def create() -> None:
version = request.config.getoption("--schema-migrator-version")
client = docker.from_env()
environment = dict(env_overrides) if env_overrides else {}
container = client.containers.run(
image=f"signoz/signoz-schema-migrator:{version}",
command=f"sync --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN"]}",
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -47,6 +54,7 @@ def migrator(
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -59,7 +67,7 @@ def migrator(
container.remove()
return types.Operation(name="migrator")
return types.Operation(name=cache_key)
def delete(_: types.Operation) -> None:
pass
@@ -70,9 +78,27 @@ def migrator(
return dev.wrap(
request,
pytestconfig,
"migrator",
cache_key,
lambda: types.Operation(name=""),
create,
delete,
restore,
)
@pytest.fixture(name="migrator", scope="package")
def migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped fixture for running schema migrations.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,70 @@
import pytest
from testcontainers.core.container import Network
from fixtures import types
from fixtures.migrator import create_migrator
from fixtures.signoz import create_signoz
UNSUPPORTED_CLICKHOUSE_VERSIONS = {"25.5.6"}
def pytest_collection_modifyitems(
config: pytest.Config, items: list[pytest.Item]
) -> None:
version = config.getoption("--clickhouse-version")
if version in UNSUPPORTED_CLICKHOUSE_VERSIONS:
skip = pytest.mark.skip(
reason=f"JSON body QB tests require ClickHouse > {version}"
)
for item in items:
item.add_marker(skip)
@pytest.fixture(name="migrator", scope="package")
def migrator_json(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped migrator with ENABLE_LOGS_MIGRATIONS_V2=1.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="migrator-json-body",
env_overrides={
"ENABLE_LOGS_MIGRATIONS_V2": "1",
},
)
@pytest.fixture(name="signoz", scope="package")
def signoz_json_body(
network: Network,
zeus: types.TestContainerDocker,
gateway: types.TestContainerDocker,
sqlstore: types.TestContainerSQL,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.SigNoz:
"""
Package-scoped fixture for SigNoz with BODY_JSON_QUERY_ENABLED=true.
"""
return create_signoz(
network=network,
zeus=zeus,
gateway=gateway,
sqlstore=sqlstore,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="signoz-json-body",
env_overrides={
"BODY_JSON_QUERY_ENABLED": "true",
},
)