mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-21 11:20:28 +01:00
Compare commits
87 Commits
add-exampl
...
feat/json-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65ce61c0dd | ||
|
|
1350dd8226 | ||
|
|
14ecd0f8db | ||
|
|
7eccbc96b6 | ||
|
|
0a595e1b71 | ||
|
|
e8ed7592e6 | ||
|
|
eee7788108 | ||
|
|
0a37aa0bd1 | ||
|
|
7d7307a6eb | ||
|
|
91beeb4949 | ||
|
|
9f8c3466b1 | ||
|
|
701323c822 | ||
|
|
cc193c9be5 | ||
|
|
512dc579b1 | ||
|
|
d904953afe | ||
|
|
9690c06cb2 | ||
|
|
8c44925a67 | ||
|
|
1ea35a19d7 | ||
|
|
c27db82794 | ||
|
|
5d9685ad9e | ||
|
|
8d0cc2d99d | ||
|
|
9643fae27a | ||
|
|
750394a73f | ||
|
|
19a65e80d8 | ||
|
|
bbef04352e | ||
|
|
0758ff133b | ||
|
|
74dceb844b | ||
|
|
0d0ebe8fe7 | ||
|
|
b35a6213a6 | ||
|
|
9cef109158 | ||
|
|
bb308c263f | ||
|
|
67e1b82adb | ||
|
|
1e8c0f19f5 | ||
|
|
c027181935 | ||
|
|
7122fb8b54 | ||
|
|
67830c8a16 | ||
|
|
096eee6435 | ||
|
|
30f5f2f2f2 | ||
|
|
52adb84461 | ||
|
|
63b7f15d0e | ||
|
|
6d3c88ed21 | ||
|
|
fc6a67aec1 | ||
|
|
9f41499047 | ||
|
|
9cd554d293 | ||
|
|
415387edfc | ||
|
|
fa1bc3db9b | ||
|
|
6c6dad8a66 | ||
|
|
7403f86773 | ||
|
|
9176ef0589 | ||
|
|
5c2a338189 | ||
|
|
704bab23cf | ||
|
|
371da26b3c | ||
|
|
97fbfbdc13 | ||
|
|
4b112988ef | ||
|
|
a47ecf3907 | ||
|
|
e4a78cf556 | ||
|
|
b6adecc294 | ||
|
|
40333a5fee | ||
|
|
4af6a9abae | ||
|
|
55e892dad3 | ||
|
|
181116308f | ||
|
|
eaa678910b | ||
|
|
e994caeb02 | ||
|
|
10840f8495 | ||
|
|
1fcd3adfc8 | ||
|
|
3e14b26b00 | ||
|
|
b30bfa6371 | ||
|
|
e7f4a04b36 | ||
|
|
0687634da3 | ||
|
|
7e7732243e | ||
|
|
2f952e402f | ||
|
|
a12febca4a | ||
|
|
cb71c9c3f7 | ||
|
|
1cd4ce6509 | ||
|
|
9299c8ab18 | ||
|
|
24749de269 | ||
|
|
39098ec3f4 | ||
|
|
fe554f5c94 | ||
|
|
8a60a041a6 | ||
|
|
541f19c34a | ||
|
|
010db03d6e | ||
|
|
5408acbd8c | ||
|
|
0de6c85f81 | ||
|
|
69ec24fa05 | ||
|
|
539d732b65 | ||
|
|
843d5fb199 | ||
|
|
fabdfb8cc1 |
3
.github/workflows/integrationci.yaml
vendored
3
.github/workflows/integrationci.yaml
vendored
@@ -52,6 +52,7 @@ jobs:
|
||||
- ingestionkeys
|
||||
- rootuser
|
||||
- serviceaccount
|
||||
- querier_json_body
|
||||
sqlstore-provider:
|
||||
- postgres
|
||||
- sqlite
|
||||
@@ -61,7 +62,7 @@ jobs:
|
||||
- 25.5.6
|
||||
- 25.12.5
|
||||
schema-migrator-version:
|
||||
- v0.142.0
|
||||
- v0.144.3
|
||||
postgres-version:
|
||||
- 15
|
||||
if: |
|
||||
|
||||
3140
docs/api/openapi.yml
3140
docs/api/openapi.yml
File diff suppressed because it is too large
Load Diff
@@ -4774,7 +4774,7 @@ export interface RuletypesPostableRuleDTO {
|
||||
* @type string
|
||||
*/
|
||||
alert: string;
|
||||
alertType: RuletypesAlertTypeDTO;
|
||||
alertType?: RuletypesAlertTypeDTO;
|
||||
/**
|
||||
* @type object
|
||||
*/
|
||||
@@ -4899,7 +4899,7 @@ export interface RuletypesRuleDTO {
|
||||
* @type string
|
||||
*/
|
||||
alert: string;
|
||||
alertType: RuletypesAlertTypeDTO;
|
||||
alertType?: RuletypesAlertTypeDTO;
|
||||
/**
|
||||
* @type object
|
||||
*/
|
||||
@@ -4984,8 +4984,8 @@ export interface RuletypesRuleConditionDTO {
|
||||
*/
|
||||
algorithm?: string;
|
||||
compositeQuery: RuletypesAlertCompositeQueryDTO;
|
||||
matchType?: RuletypesMatchTypeDTO;
|
||||
op?: RuletypesCompareOperatorDTO;
|
||||
matchType: RuletypesMatchTypeDTO;
|
||||
op: RuletypesCompareOperatorDTO;
|
||||
/**
|
||||
* @type boolean
|
||||
*/
|
||||
|
||||
2
go.mod
2
go.mod
@@ -8,7 +8,7 @@ require (
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/antonmedv/expr v1.15.3
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -108,8 +108,8 @@ github.com/SigNoz/expr v1.17.7-beta h1:FyZkleM5dTQ0O6muQfwGpoH5A2ohmN/XTasRCO72g
|
||||
github.com/SigNoz/expr v1.17.7-beta/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4 h1:EskJkEMfuuIyArWhV8SleDV/fuKxiaEGTXrCZIFqDT4=
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3 h1:/7PPIqIpPsaWtrgnfHam2XVYP41ZlgEKLHzQO8oVxcA=
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
|
||||
github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
|
||||
github.com/Yiling-J/theine-go v0.6.2/go.mod h1:08QpMa5JZ2pKN+UJCRrCasWYO1IKCdl54Xa836rpmDU=
|
||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
|
||||
|
||||
@@ -44,7 +44,6 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
|
||||
Description: "This endpoint creates a new alert rule",
|
||||
Request: new(ruletypes.PostableRule),
|
||||
RequestContentType: "application/json",
|
||||
RequestExamples: postableRuleExamples(),
|
||||
Response: new(ruletypes.Rule),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusCreated,
|
||||
@@ -55,28 +54,27 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v2/rules/{id}", handler.New(provider.authZ.EditAccess(provider.rulerHandler.UpdateRuleByID), handler.OpenAPIDef{
|
||||
ID: "UpdateRuleByID",
|
||||
Tags: []string{"rules"},
|
||||
Summary: "Update alert rule",
|
||||
Description: "This endpoint updates an alert rule by ID",
|
||||
Request: new(ruletypes.PostableRule),
|
||||
RequestContentType: "application/json",
|
||||
RequestExamples: postableRuleExamples(),
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
|
||||
ID: "UpdateRuleByID",
|
||||
Tags: []string{"rules"},
|
||||
Summary: "Update alert rule",
|
||||
Description: "This endpoint updates an alert rule by ID",
|
||||
Request: new(ruletypes.PostableRule),
|
||||
RequestContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
|
||||
})).Methods(http.MethodPut).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v2/rules/{id}", handler.New(provider.authZ.EditAccess(provider.rulerHandler.DeleteRuleByID), handler.OpenAPIDef{
|
||||
ID: "DeleteRuleByID",
|
||||
Tags: []string{"rules"},
|
||||
Summary: "Delete alert rule",
|
||||
Description: "This endpoint deletes an alert rule by ID",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
|
||||
ID: "DeleteRuleByID",
|
||||
Tags: []string{"rules"},
|
||||
Summary: "Delete alert rule",
|
||||
Description: "This endpoint deletes an alert rule by ID",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
|
||||
})).Methods(http.MethodDelete).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -88,7 +86,6 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
|
||||
Description: "This endpoint applies a partial update to an alert rule by ID",
|
||||
Request: new(ruletypes.PostableRule),
|
||||
RequestContentType: "application/json",
|
||||
RequestExamples: postableRuleExamples(),
|
||||
Response: new(ruletypes.Rule),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
@@ -105,7 +102,6 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
|
||||
Description: "This endpoint fires a test notification for the given rule definition",
|
||||
Request: new(ruletypes.PostableRule),
|
||||
RequestContentType: "application/json",
|
||||
RequestExamples: postableRuleExamples(),
|
||||
Response: new(ruletypes.GettableTestRule),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
@@ -160,27 +156,27 @@ func (provider *provider) addRulerRoutes(router *mux.Router) error {
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/downtime_schedules/{id}", handler.New(provider.authZ.EditAccess(provider.rulerHandler.UpdateDowntimeScheduleByID), handler.OpenAPIDef{
|
||||
ID: "UpdateDowntimeScheduleByID",
|
||||
Tags: []string{"downtimeschedules"},
|
||||
Summary: "Update downtime schedule",
|
||||
Description: "This endpoint updates a downtime schedule by ID",
|
||||
Request: new(ruletypes.PostablePlannedMaintenance),
|
||||
RequestContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
|
||||
ID: "UpdateDowntimeScheduleByID",
|
||||
Tags: []string{"downtimeschedules"},
|
||||
Summary: "Update downtime schedule",
|
||||
Description: "This endpoint updates a downtime schedule by ID",
|
||||
Request: new(ruletypes.PostablePlannedMaintenance),
|
||||
RequestContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
|
||||
})).Methods(http.MethodPut).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/downtime_schedules/{id}", handler.New(provider.authZ.EditAccess(provider.rulerHandler.DeleteDowntimeScheduleByID), handler.OpenAPIDef{
|
||||
ID: "DeleteDowntimeScheduleByID",
|
||||
Tags: []string{"downtimeschedules"},
|
||||
Summary: "Delete downtime schedule",
|
||||
Description: "This endpoint deletes a downtime schedule by ID",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
|
||||
ID: "DeleteDowntimeScheduleByID",
|
||||
Tags: []string{"downtimeschedules"},
|
||||
Summary: "Delete downtime schedule",
|
||||
Description: "This endpoint deletes a downtime schedule by ID",
|
||||
SuccessStatusCode: http.StatusNoContent,
|
||||
ErrorStatusCodes: []int{http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
|
||||
})).Methods(http.MethodDelete).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,733 +0,0 @@
|
||||
package signozapiserver
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/http/handler"
|
||||
|
||||
// postableRuleExamples returns example payloads attached to every rule-write
|
||||
// endpoint. They cover each alert type, rule type, and composite-query shape.
|
||||
func postableRuleExamples() []handler.OpenAPIExample {
|
||||
rolling := func(evalWindow, frequency string) map[string]any {
|
||||
return map[string]any{
|
||||
"kind": "rolling",
|
||||
"spec": map[string]any{"evalWindow": evalWindow, "frequency": frequency},
|
||||
}
|
||||
}
|
||||
renotify := func(interval string, states ...string) map[string]any {
|
||||
s := make([]any, 0, len(states))
|
||||
for _, v := range states {
|
||||
s = append(s, v)
|
||||
}
|
||||
return map[string]any{
|
||||
"enabled": true,
|
||||
"interval": interval,
|
||||
"alertStates": s,
|
||||
}
|
||||
}
|
||||
|
||||
return []handler.OpenAPIExample{
|
||||
{
|
||||
Name: "metric_threshold_single",
|
||||
Summary: "Metric threshold single builder query",
|
||||
Description: "Fires when a pod consumes more than 80% of its requested CPU for the whole evaluation window. Uses `k8s.pod.cpu_request_utilization`.",
|
||||
Value: map[string]any{
|
||||
"alert": "Pod CPU above 80% of request",
|
||||
"alertType": "METRIC_BASED_ALERT",
|
||||
"description": "CPU usage for api-service pods exceeds 80% of the requested CPU",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"unit": "percentunit",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "metrics",
|
||||
"stepInterval": 60,
|
||||
"aggregations": []any{map[string]any{"metricName": "k8s.pod.cpu_request_utilization", "timeAggregation": "avg", "spaceAggregation": "max"}},
|
||||
"filter": map[string]any{"expression": "k8s.deployment.name = 'api-service'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "k8s.pod.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
},
|
||||
"legend": "{{k8s.pod.name}} ({{deployment.environment}})",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"selectedQueryName": "A",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "critical",
|
||||
"op": "above",
|
||||
"matchType": "all_the_times",
|
||||
"target": 0.8,
|
||||
"channels": []any{"slack-platform", "pagerduty-oncall"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("15m", "1m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"k8s.pod.name", "deployment.environment"},
|
||||
"renotify": renotify("4h", "firing"),
|
||||
},
|
||||
"labels": map[string]any{"severity": "critical", "team": "platform"},
|
||||
"annotations": map[string]any{
|
||||
"description": "Pod {{$k8s.pod.name}} CPU is at {{$value}} of request in {{$deployment.environment}}.",
|
||||
"summary": "Pod CPU above {{$threshold}} of request",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "metric_threshold_formula",
|
||||
Summary: "Metric threshold multi-query formula",
|
||||
Description: "Computes disk utilization as (1 - available/capacity) * 100 by combining two disabled base queries with a builder_formula. The formula emits 0–100, so compositeQuery.unit is set to \"percent\" and the target is a bare number.",
|
||||
Value: map[string]any{
|
||||
"alert": "PersistentVolume above 80% utilization",
|
||||
"alertType": "METRIC_BASED_ALERT",
|
||||
"description": "Disk utilization for a persistent volume is above 80%",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"unit": "percent",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "metrics",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"metricName": "k8s.volume.available", "timeAggregation": "max", "spaceAggregation": "max"}},
|
||||
"filter": map[string]any{"expression": "k8s.volume.type = 'persistentVolumeClaim'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "k8s.persistentvolumeclaim.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "k8s.namespace.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "B",
|
||||
"signal": "metrics",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"metricName": "k8s.volume.capacity", "timeAggregation": "max", "spaceAggregation": "max"}},
|
||||
"filter": map[string]any{"expression": "k8s.volume.type = 'persistentVolumeClaim'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "k8s.persistentvolumeclaim.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "k8s.namespace.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_formula",
|
||||
"spec": map[string]any{
|
||||
"name": "F1",
|
||||
"expression": "(1 - A/B) * 100",
|
||||
"legend": "{{k8s.persistentvolumeclaim.name}} ({{k8s.namespace.name}})",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"selectedQueryName": "F1",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "critical",
|
||||
"op": "above",
|
||||
"matchType": "at_least_once",
|
||||
"target": 80,
|
||||
"channels": []any{"slack-storage"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("30m", "5m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"k8s.namespace.name", "k8s.persistentvolumeclaim.name"},
|
||||
"renotify": renotify("2h", "firing"),
|
||||
},
|
||||
"labels": map[string]any{"severity": "critical"},
|
||||
"annotations": map[string]any{
|
||||
"description": "Volume {{$k8s.persistentvolumeclaim.name}} in {{$k8s.namespace.name}} is {{$value}}% full.",
|
||||
"summary": "Disk utilization above {{$threshold}}%",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "metric_promql",
|
||||
Summary: "Metric threshold PromQL rule",
|
||||
Description: "PromQL expression instead of the builder. Dotted OTEL resource attributes are quoted (\"deployment.environment\"). Useful for queries that combine series with group_right or other Prom operators.",
|
||||
Value: map[string]any{
|
||||
"alert": "Kafka consumer group lag above 1000",
|
||||
"alertType": "METRIC_BASED_ALERT",
|
||||
"description": "Consumer group lag computed via PromQL",
|
||||
"ruleType": "promql_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "promql",
|
||||
"panelType": "graph",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "promql",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"query": "(max by(topic, partition, \"deployment.environment\")(kafka_log_end_offset) - on(topic, partition, \"deployment.environment\") group_right max by(group, topic, partition, \"deployment.environment\")(kafka_consumer_committed_offset)) > 0",
|
||||
"legend": "{{topic}}/{{partition}} ({{group}})",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"selectedQueryName": "A",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "critical",
|
||||
"op": "above",
|
||||
"matchType": "all_the_times",
|
||||
"target": 1000,
|
||||
"channels": []any{"slack-data-platform", "pagerduty-data"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("10m", "1m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"group", "topic"},
|
||||
"renotify": renotify("1h", "firing"),
|
||||
},
|
||||
"labels": map[string]any{"severity": "critical"},
|
||||
"annotations": map[string]any{
|
||||
"description": "Consumer group {{$group}} is {{$value}} messages behind on {{$topic}}/{{$partition}}.",
|
||||
"summary": "Kafka consumer lag high",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "metric_anomaly",
|
||||
Summary: "Metric anomaly rule (v1 only)",
|
||||
Description: "Anomaly rules are not yet supported under schemaVersion v2alpha1, so this example uses the v1 shape. Wraps a builder query in the `anomaly` function with daily seasonality SigNoz compares each point against the forecast for that time of day. Fires when the anomaly score stays below the threshold for the entire window; `requireMinPoints` guards against noisy intervals.",
|
||||
Value: map[string]any{
|
||||
"alert": "Anomalous drop in ingested spans",
|
||||
"alertType": "METRIC_BASED_ALERT",
|
||||
"description": "Detect an abrupt drop in span ingestion using a z-score anomaly function",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"evalWindow": "24h",
|
||||
"frequency": "3h",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "metrics",
|
||||
"stepInterval": 21600,
|
||||
"aggregations": []any{map[string]any{"metricName": "otelcol_receiver_accepted_spans", "timeAggregation": "rate", "spaceAggregation": "sum"}},
|
||||
"filter": map[string]any{"expression": "tenant_tier = 'premium'"},
|
||||
"groupBy": []any{map[string]any{"name": "tenant_id", "fieldContext": "attribute", "fieldDataType": "string"}},
|
||||
"functions": []any{
|
||||
map[string]any{
|
||||
"name": "anomaly",
|
||||
"args": []any{map[string]any{"name": "z_score_threshold", "value": 2}},
|
||||
},
|
||||
},
|
||||
"legend": "{{tenant_id}}",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"op": "below",
|
||||
"matchType": "all_the_times",
|
||||
"target": 2,
|
||||
"algorithm": "standard",
|
||||
"seasonality": "daily",
|
||||
"selectedQueryName": "A",
|
||||
"requireMinPoints": true,
|
||||
"requiredNumPoints": 3,
|
||||
},
|
||||
"labels": map[string]any{"severity": "warning"},
|
||||
"preferredChannels": []any{"slack-ingestion"},
|
||||
"annotations": map[string]any{
|
||||
"description": "Ingestion rate for tenant {{$tenant_id}} is anomalously low (z-score {{$value}}).",
|
||||
"summary": "Span ingestion anomaly",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "logs_threshold",
|
||||
Summary: "Logs threshold count() over filter",
|
||||
Description: "Counts matching log records (ERROR severity + body contains) over a rolling window. Fires at least once per evaluation when the count exceeds zero.",
|
||||
Value: map[string]any{
|
||||
"alert": "Payments service panic logs",
|
||||
"alertType": "LOGS_BASED_ALERT",
|
||||
"description": "Any panic log line emitted by the payments service",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "logs",
|
||||
"stepInterval": 60,
|
||||
"aggregations": []any{map[string]any{"expression": "count()"}},
|
||||
"filter": map[string]any{"expression": "service.name = 'payments-api' AND severity_text = 'ERROR' AND body CONTAINS 'panic'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "k8s.pod.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
},
|
||||
"legend": "{{k8s.pod.name}} ({{deployment.environment}})",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"selectedQueryName": "A",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "critical",
|
||||
"op": "above",
|
||||
"matchType": "at_least_once",
|
||||
"target": 0,
|
||||
"channels": []any{"slack-payments", "pagerduty-payments"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("5m", "1m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"k8s.pod.name", "deployment.environment"},
|
||||
"renotify": renotify("15m", "firing"),
|
||||
},
|
||||
"labels": map[string]any{"severity": "critical", "team": "payments"},
|
||||
"annotations": map[string]any{
|
||||
"description": "{{$k8s.pod.name}} emitted {{$value}} panic log(s) in {{$deployment.environment}}.",
|
||||
"summary": "Payments service panic",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "logs_error_rate_formula",
|
||||
Summary: "Logs error rate error count / total count × 100",
|
||||
Description: "Two disabled log count queries (A = errors, B = total) combined via a builder_formula into a percentage. Classic service-level error-rate alert pattern for log-based signals.",
|
||||
Value: map[string]any{
|
||||
"alert": "Payments-api error log rate above 1%",
|
||||
"alertType": "LOGS_BASED_ALERT",
|
||||
"description": "Error log ratio as a percentage of total logs for payments-api",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"unit": "percent",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "logs",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"expression": "count()"}},
|
||||
"filter": map[string]any{"expression": "service.name = 'payments-api' AND severity_text IN ['ERROR', 'FATAL']"},
|
||||
"groupBy": []any{map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"}},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "B",
|
||||
"signal": "logs",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"expression": "count()"}},
|
||||
"filter": map[string]any{"expression": "service.name = 'payments-api'"},
|
||||
"groupBy": []any{map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"}},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_formula",
|
||||
"spec": map[string]any{
|
||||
"name": "F1",
|
||||
"expression": "(A / B) * 100",
|
||||
"legend": "{{deployment.environment}}",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"selectedQueryName": "F1",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "critical",
|
||||
"op": "above",
|
||||
"matchType": "at_least_once",
|
||||
"target": 1,
|
||||
"channels": []any{"slack-payments"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("5m", "1m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"deployment.environment"},
|
||||
"renotify": renotify("30m", "firing"),
|
||||
},
|
||||
"labels": map[string]any{"severity": "critical", "team": "payments"},
|
||||
"annotations": map[string]any{
|
||||
"description": "Error log rate in {{$deployment.environment}} is {{$value}}%",
|
||||
"summary": "Payments-api error rate above {{$threshold}}%",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "traces_threshold_latency",
|
||||
Summary: "Traces threshold p99 latency (ns → s conversion)",
|
||||
Description: "Builder query against the traces signal with p99(duration_nano). The series unit is ns (compositeQuery.unit), the target is in seconds (threshold.targetUnit) SigNoz converts before comparing. Canonical shape when series and target live in different units.",
|
||||
Value: map[string]any{
|
||||
"alert": "Search API p99 latency above 5s",
|
||||
"alertType": "TRACES_BASED_ALERT",
|
||||
"description": "p99 duration of the search endpoint exceeds 5s",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"unit": "ns",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"stepInterval": 60,
|
||||
"aggregations": []any{map[string]any{"expression": "p99(duration_nano)"}},
|
||||
"filter": map[string]any{"expression": "service.name = 'search-api' AND name = 'GET /api/v1/search'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "http.route", "fieldContext": "attribute", "fieldDataType": "string"},
|
||||
},
|
||||
"legend": "{{service.name}} {{http.route}}",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"selectedQueryName": "A",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "warning",
|
||||
"op": "above",
|
||||
"matchType": "at_least_once",
|
||||
"target": 5,
|
||||
"targetUnit": "s",
|
||||
"channels": []any{"slack-search"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("5m", "1m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"service.name", "http.route"},
|
||||
"renotify": renotify("30m", "firing"),
|
||||
},
|
||||
"labels": map[string]any{"severity": "warning", "team": "search"},
|
||||
"annotations": map[string]any{
|
||||
"description": "p99 latency for {{$service.name}} on {{$http.route}} crossed {{$threshold}}s.",
|
||||
"summary": "Search-api latency degraded",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "traces_error_rate_formula",
|
||||
Summary: "Traces error rate error spans / total spans × 100",
|
||||
Description: "Two disabled trace count queries (A = error spans where hasError=true, B = total spans) combined via a builder_formula into a percentage. Mirrors the common request-error-rate dashboard shape.",
|
||||
Value: map[string]any{
|
||||
"alert": "Search-api error rate above 5%",
|
||||
"alertType": "TRACES_BASED_ALERT",
|
||||
"description": "Request error rate for search-api, grouped by route",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"unit": "percent",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"expression": "count()"}},
|
||||
"filter": map[string]any{"expression": "service.name = 'search-api' AND hasError = true"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "http.route", "fieldContext": "attribute", "fieldDataType": "string"},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "B",
|
||||
"signal": "traces",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"expression": "count()"}},
|
||||
"filter": map[string]any{"expression": "service.name = 'search-api'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "http.route", "fieldContext": "attribute", "fieldDataType": "string"},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_formula",
|
||||
"spec": map[string]any{
|
||||
"name": "F1",
|
||||
"expression": "(A / B) * 100",
|
||||
"legend": "{{service.name}} {{http.route}}",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"selectedQueryName": "F1",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "critical",
|
||||
"op": "above",
|
||||
"matchType": "at_least_once",
|
||||
"target": 5,
|
||||
"channels": []any{"slack-search", "pagerduty-search"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("5m", "1m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"service.name", "http.route"},
|
||||
"renotify": renotify("15m", "firing"),
|
||||
},
|
||||
"labels": map[string]any{"severity": "critical", "team": "search"},
|
||||
"annotations": map[string]any{
|
||||
"description": "Error rate on {{$service.name}} {{$http.route}} is {{$value}}%",
|
||||
"summary": "Search-api error rate above {{$threshold}}%",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "tiered_thresholds",
|
||||
Summary: "Tiered thresholds with per-tier channels",
|
||||
Description: "Two tiers (warning and critical) in a single rule, each with its own target, op, matchType, and channels so warnings and pages route to different receivers. `alertOnAbsent` + `absentFor` fires a no-data alert when the query returns no series for 15 consecutive evaluations.",
|
||||
Value: map[string]any{
|
||||
"alert": "Kafka consumer lag warn / critical",
|
||||
"alertType": "METRIC_BASED_ALERT",
|
||||
"description": "Warn at lag ≥ 50 and page at ≥ 200, tiered via thresholds.spec.",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "metrics",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"metricName": "kafka_log_end_offset", "timeAggregation": "max", "spaceAggregation": "max"}},
|
||||
"filter": map[string]any{"expression": "topic != '__consumer_offsets'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "topic", "fieldContext": "attribute", "fieldDataType": "string"},
|
||||
map[string]any{"name": "partition", "fieldContext": "attribute", "fieldDataType": "string"},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "B",
|
||||
"signal": "metrics",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"metricName": "kafka_consumer_committed_offset", "timeAggregation": "max", "spaceAggregation": "max"}},
|
||||
"filter": map[string]any{"expression": "topic != '__consumer_offsets'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "topic", "fieldContext": "attribute", "fieldDataType": "string"},
|
||||
map[string]any{"name": "partition", "fieldContext": "attribute", "fieldDataType": "string"},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_formula",
|
||||
"spec": map[string]any{
|
||||
"name": "F1",
|
||||
"expression": "A - B",
|
||||
"legend": "{{topic}}/{{partition}}",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"alertOnAbsent": true,
|
||||
"absentFor": 15,
|
||||
"selectedQueryName": "F1",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "warning",
|
||||
"op": "above",
|
||||
"matchType": "all_the_times",
|
||||
"target": 50,
|
||||
"channels": []any{"slack-kafka-info"},
|
||||
},
|
||||
map[string]any{
|
||||
"name": "critical",
|
||||
"op": "above",
|
||||
"matchType": "all_the_times",
|
||||
"target": 200,
|
||||
"channels": []any{"slack-kafka-alerts", "pagerduty-kafka"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("5m", "1m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"topic"},
|
||||
"renotify": renotify("15m", "firing"),
|
||||
},
|
||||
"labels": map[string]any{"team": "data-platform"},
|
||||
"annotations": map[string]any{
|
||||
"description": "Consumer lag for {{$topic}} partition {{$partition}} is {{$value}}.",
|
||||
"summary": "Kafka consumer lag",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "notification_settings",
|
||||
Summary: "Full notification settings (grouping, nodata renotify, grace period)",
|
||||
Description: "Demonstrates the full notificationSettings surface: `groupBy` merges alerts across labels to cut noise, `newGroupEvalDelay` gives newly-appearing series a grace period before firing, `renotify` re-alerts every 30m while firing OR while the alert is in nodata (missing data is treated as actionable), and `usePolicy: false` means channels come from the threshold entries rather than global routing policies. Set `usePolicy: true` to skip per-threshold channels and route via the org-level notification policy instead.",
|
||||
Value: map[string]any{
|
||||
"alert": "API 5xx error rate above 1%",
|
||||
"alertType": "TRACES_BASED_ALERT",
|
||||
"description": "Noise-controlled 5xx error rate alert with renotify on gaps",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"schemaVersion": "v2alpha1",
|
||||
"condition": map[string]any{
|
||||
"compositeQuery": map[string]any{
|
||||
"queryType": "builder",
|
||||
"panelType": "graph",
|
||||
"unit": "percent",
|
||||
"queries": []any{
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"expression": "count()"}},
|
||||
"filter": map[string]any{"expression": "service.name CONTAINS 'api' AND http.status_code >= 500"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_query",
|
||||
"spec": map[string]any{
|
||||
"name": "B",
|
||||
"signal": "traces",
|
||||
"stepInterval": 60,
|
||||
"disabled": true,
|
||||
"aggregations": []any{map[string]any{"expression": "count()"}},
|
||||
"filter": map[string]any{"expression": "service.name CONTAINS 'api'"},
|
||||
"groupBy": []any{
|
||||
map[string]any{"name": "service.name", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
map[string]any{"name": "deployment.environment", "fieldContext": "resource", "fieldDataType": "string"},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]any{
|
||||
"type": "builder_formula",
|
||||
"spec": map[string]any{
|
||||
"name": "F1",
|
||||
"expression": "(A / B) * 100",
|
||||
"legend": "{{service.name}} ({{deployment.environment}})",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"selectedQueryName": "F1",
|
||||
"thresholds": map[string]any{
|
||||
"kind": "basic",
|
||||
"spec": []any{
|
||||
map[string]any{
|
||||
"name": "critical",
|
||||
"op": "above",
|
||||
"matchType": "at_least_once",
|
||||
"target": 1,
|
||||
"channels": []any{"slack-api-alerts", "pagerduty-oncall"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"evaluation": rolling("5m", "1m"),
|
||||
"notificationSettings": map[string]any{
|
||||
"groupBy": []any{"service.name", "deployment.environment"},
|
||||
"newGroupEvalDelay": "2m",
|
||||
"usePolicy": false,
|
||||
"renotify": renotify("30m", "firing", "nodata"),
|
||||
},
|
||||
"labels": map[string]any{"team": "platform"},
|
||||
"annotations": map[string]any{
|
||||
"description": "{{$service.name}} 5xx rate in {{$deployment.environment}} is {{$value}}%.",
|
||||
"summary": "API service error rate elevated",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
)
|
||||
|
||||
// TestPostableRuleExamplesValidate verifies every example payload returned by
|
||||
// postableRuleExamples() round-trips through PostableRule.UnmarshalJSON and
|
||||
// passes Validate(). If an example drifts from the runtime contract this
|
||||
// breaks loudly so the spec doesn't ship invalid payloads to users.
|
||||
func TestPostableRuleExamplesValidate(t *testing.T) {
|
||||
for _, example := range postableRuleExamples() {
|
||||
t.Run(example.Name, func(t *testing.T) {
|
||||
raw, err := json.Marshal(example.Value)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal example: %v", err)
|
||||
}
|
||||
|
||||
var rule ruletypes.PostableRule
|
||||
if err := json.Unmarshal(raw, &rule); err != nil {
|
||||
t.Fatalf("unmarshal: %v\npayload: %s", err, raw)
|
||||
}
|
||||
|
||||
if err := rule.Validate(); err != nil {
|
||||
t.Fatalf("Validate: %v\npayload: %s", err, raw)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -32,26 +32,16 @@ func NewModule(metadataStore telemetrytypes.MetadataStore, telemetrystore teleme
|
||||
}
|
||||
|
||||
func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetypes.PromotePath, error) {
|
||||
logsIndexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
|
||||
indexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Flatten the map values (which are slices) into a single slice
|
||||
indexes := slices.Concat(slices.Collect(maps.Values(logsIndexes))...)
|
||||
|
||||
aggr := map[string][]promotetypes.WrappedIndex{}
|
||||
for _, index := range indexes {
|
||||
path, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// clean backticks from the path
|
||||
path = strings.ReplaceAll(path, "`", "")
|
||||
|
||||
aggr[path] = append(aggr[path], promotetypes.WrappedIndex{
|
||||
ColumnType: columnType,
|
||||
Type: index.Type,
|
||||
aggr[index.Name] = append(aggr[index.Name], promotetypes.WrappedIndex{
|
||||
ColumnType: telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType].StringValue(),
|
||||
Type: index.IndexType,
|
||||
Granularity: index.Granularity,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -204,7 +204,7 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet
|
||||
// Downstream query builder should handle multiple matching keys with their own metadata
|
||||
// and not rely on this function to do so.
|
||||
materialized := true
|
||||
indexes := []telemetrytypes.JSONDataTypeIndex{}
|
||||
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
|
||||
fieldContextsSeen := map[telemetrytypes.FieldContext]bool{}
|
||||
dataTypesSeen := map[telemetrytypes.FieldDataType]bool{}
|
||||
for _, matchingKey := range matchingKeys {
|
||||
|
||||
@@ -195,8 +195,8 @@ func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetryty
|
||||
// the field genuinely holds the empty/zero value.
|
||||
//
|
||||
// Note: indexing is also skipped for Array Nested fields because they cannot be indexed.
|
||||
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
|
||||
return index.Type == node.TerminalConfig.ElemType
|
||||
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.TelemetryFieldKeySkipIndex) bool {
|
||||
return telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType] == node.TerminalConfig.ElemType
|
||||
})
|
||||
isExistsCheck := operator == qbtypes.FilterOperatorExists || operator == qbtypes.FilterOperatorNotExists
|
||||
if node.TerminalConfig.ElemType.IndexSupported && indexed && !isExistsCheck {
|
||||
|
||||
@@ -1127,9 +1127,12 @@ func buildTestTelemetryMetadataStore(t *testing.T, addIndexes bool) *telemetryty
|
||||
return entry.Path == path && entry.Type == jsonType
|
||||
})
|
||||
if idx >= 0 {
|
||||
key.Indexes = append(key.Indexes, telemetrytypes.JSONDataTypeIndex{
|
||||
Type: jsonType,
|
||||
ColumnExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
|
||||
key.Indexes = append(key.Indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
|
||||
Name: path,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldDataType: fdt,
|
||||
BaseColumn: LogsV2BodyV2Column,
|
||||
IndexExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFiel
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.JSONDataTypeIndex, error) {
|
||||
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
|
||||
filteredPaths := []string{}
|
||||
for _, path := range paths {
|
||||
// skip array paths; since they don't have any indexes
|
||||
@@ -116,47 +116,22 @@ func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...st
|
||||
filteredPaths = append(filteredPaths, path)
|
||||
}
|
||||
if len(filteredPaths) == 0 {
|
||||
return make(map[string][]telemetrytypes.JSONDataTypeIndex), nil
|
||||
return make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex), nil
|
||||
}
|
||||
|
||||
// list indexes for the paths
|
||||
indexesMap, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
|
||||
indexes, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to list JSON path indexes")
|
||||
}
|
||||
|
||||
// build a set of indexes
|
||||
cleanIndexes := make(map[string][]telemetrytypes.JSONDataTypeIndex)
|
||||
for path, indexes := range indexesMap {
|
||||
for _, index := range indexes {
|
||||
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", index.Expression)
|
||||
}
|
||||
|
||||
jsonDataType, found := telemetrytypes.MappingStringToJSONDataType[columnType]
|
||||
if !found {
|
||||
t.logger.ErrorContext(ctx, "failed to map column type to JSON data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
|
||||
continue
|
||||
}
|
||||
|
||||
if jsonDataType == telemetrytypes.String {
|
||||
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
|
||||
Type: telemetrytypes.String,
|
||||
ColumnExpression: columnExpr,
|
||||
IndexExpression: index.Expression,
|
||||
})
|
||||
} else if strings.HasPrefix(index.Type, "minmax") {
|
||||
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
|
||||
Type: jsonDataType,
|
||||
ColumnExpression: columnExpr,
|
||||
IndexExpression: index.Expression,
|
||||
})
|
||||
}
|
||||
}
|
||||
fieldPathToIndexes := make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex)
|
||||
for _, index := range indexes {
|
||||
fieldPathToIndexes[index.Name] = append(fieldPathToIndexes[index.Name], index)
|
||||
}
|
||||
|
||||
return cleanIndexes, nil
|
||||
return fieldPathToIndexes, nil
|
||||
}
|
||||
|
||||
func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, []any) {
|
||||
@@ -180,7 +155,7 @@ func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, [
|
||||
return sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
|
||||
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
|
||||
ctx = withTelemetryContext(ctx, "ListLogsJSONIndexes")
|
||||
query, args := buildListLogsJSONIndexesQuery(t.telemetrystore.Cluster(), filters...)
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
@@ -189,7 +164,7 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
indexes := make(map[string][]schemamigrator.Index)
|
||||
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var typeFull string
|
||||
@@ -198,11 +173,42 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
|
||||
if err := rows.Scan(&name, &typeFull, &expr, &granularity); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to scan string indexed column")
|
||||
}
|
||||
indexes[name] = append(indexes[name], schemamigrator.Index{
|
||||
Name: name,
|
||||
Type: typeFull,
|
||||
Expression: expr,
|
||||
Granularity: int(granularity),
|
||||
|
||||
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(expr)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", expr)
|
||||
}
|
||||
|
||||
fdt, found := telemetrytypes.MappingJSONDataTypeToFieldDataType[columnType]
|
||||
if !found {
|
||||
t.logger.ErrorContext(ctx, "failed to map JSON data type to field data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
|
||||
continue
|
||||
}
|
||||
|
||||
// Key by the field path (e.g. "user.name"), not by the ClickHouse index name.
|
||||
// columnExpr is the full column reference like "body_v2.user.name"; strip the
|
||||
// column prefix and any backticks added for special-character path segments.
|
||||
baseColumn := ""
|
||||
fieldName := ""
|
||||
switch {
|
||||
case strings.HasPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix):
|
||||
baseColumn = telemetrylogs.BodyV2ColumnPrefix
|
||||
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix)
|
||||
case strings.HasPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix):
|
||||
baseColumn = telemetrylogs.BodyPromotedColumnPrefix
|
||||
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix)
|
||||
}
|
||||
fieldName = strings.ReplaceAll(fieldName, "`", "")
|
||||
|
||||
indexes = append(indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
|
||||
Name: fieldName,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldDataType: fdt,
|
||||
BaseColumn: baseColumn,
|
||||
IndexName: name,
|
||||
IndexType: typeFull,
|
||||
IndexExpression: expr,
|
||||
Granularity: int(granularity),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func (i *PromotePath) ValidateAndSetDefaults() error {
|
||||
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index granularity must be greater than 0")
|
||||
}
|
||||
|
||||
jsonDataType, ok := telemetrytypes.MappingStringToJSONDataType[index.ColumnType]
|
||||
jsonDataType, ok := telemetrytypes.MappingFieldDataTypeToJSONDataType[telemetrytypes.MappingJSONDataTypeToFieldDataType[index.ColumnType]]
|
||||
if !ok {
|
||||
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid column type: %s", index.ColumnType)
|
||||
}
|
||||
|
||||
@@ -114,11 +114,11 @@ type AlertCompositeQuery struct {
|
||||
|
||||
type RuleCondition struct {
|
||||
CompositeQuery *AlertCompositeQuery `json:"compositeQuery" required:"true"`
|
||||
CompareOperator CompareOperator `json:"op,omitzero"`
|
||||
CompareOperator CompareOperator `json:"op" required:"true"`
|
||||
Target *float64 `json:"target,omitempty"`
|
||||
AlertOnAbsent bool `json:"alertOnAbsent,omitempty"`
|
||||
AbsentFor uint64 `json:"absentFor,omitempty"`
|
||||
MatchType MatchType `json:"matchType,omitzero"`
|
||||
MatchType MatchType `json:"matchType" required:"true"`
|
||||
TargetUnit string `json:"targetUnit,omitempty"`
|
||||
Algorithm string `json:"algorithm,omitempty"`
|
||||
Seasonality Seasonality `json:"seasonality,omitzero"`
|
||||
|
||||
@@ -50,13 +50,13 @@ const (
|
||||
// PostableRule is used to create alerting rule from HTTP api.
|
||||
type PostableRule struct {
|
||||
AlertName string `json:"alert" required:"true"`
|
||||
AlertType AlertType `json:"alertType" required:"true"`
|
||||
AlertType AlertType `json:"alertType,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
RuleType RuleType `json:"ruleType" required:"true"`
|
||||
RuleType RuleType `json:"ruleType,omitzero" required:"true"`
|
||||
EvalWindow valuer.TextDuration `json:"evalWindow,omitzero"`
|
||||
Frequency valuer.TextDuration `json:"frequency,omitzero"`
|
||||
|
||||
RuleCondition *RuleCondition `json:"condition" required:"true"`
|
||||
RuleCondition *RuleCondition `json:"condition,omitempty" required:"true"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
|
||||
@@ -67,9 +67,9 @@ type PostableRule struct {
|
||||
|
||||
PreferredChannels []string `json:"preferredChannels,omitempty"`
|
||||
|
||||
Version string `json:"version"`
|
||||
Version string `json:"version,omitempty"`
|
||||
|
||||
Evaluation *EvaluationEnvelope `json:"evaluation,omitempty"`
|
||||
Evaluation *EvaluationEnvelope `yaml:"evaluation,omitempty" json:"evaluation,omitempty"`
|
||||
SchemaVersion string `json:"schemaVersion,omitempty"`
|
||||
|
||||
NotificationSettings *NotificationSettings `json:"notificationSettings,omitempty"`
|
||||
|
||||
@@ -37,9 +37,9 @@ type TelemetryFieldKey struct {
|
||||
FieldContext FieldContext `json:"fieldContext,omitzero"`
|
||||
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
|
||||
|
||||
JSONPlan JSONAccessPlan `json:"-"`
|
||||
Indexes []JSONDataTypeIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
JSONPlan JSONAccessPlan `json:"-"`
|
||||
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
|
||||
Evolutions []*EvolutionEntry `json:"-"`
|
||||
}
|
||||
@@ -102,7 +102,7 @@ func (f TelemetryFieldKey) String() string {
|
||||
if i > 0 {
|
||||
sb.WriteString("; ")
|
||||
}
|
||||
fmt.Fprintf(&sb, "{type=%s, columnExpr=%s, indexExpr=%s}", index.Type.StringValue(), index.ColumnExpression, index.IndexExpression)
|
||||
fmt.Fprintf(&sb, "{type=%s, indexExpr=%s}", MappingFieldDataTypeToJSONDataType[index.FieldDataType].StringValue(), index.IndexExpression)
|
||||
}
|
||||
sb.WriteString("]")
|
||||
}
|
||||
@@ -400,3 +400,14 @@ func NewFieldValueSelectorFromPostableFieldValueParams(params PostableFieldValue
|
||||
|
||||
return fieldValueSelector
|
||||
}
|
||||
|
||||
type TelemetryFieldKeySkipIndex struct {
|
||||
Name string `json:"name"` // Name is TelemetryFieldKey.Name not IndexName from ClickHouse
|
||||
FieldContext FieldContext `json:"fieldContext,omitzero"`
|
||||
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
|
||||
BaseColumn string `json:"baseColumn"`
|
||||
IndexName string `json:"indexName"`
|
||||
IndexType string `json:"indexType"`
|
||||
IndexExpression string `json:"indexExpression"`
|
||||
Granularity int `json:"granularity"`
|
||||
}
|
||||
|
||||
@@ -1,11 +1,5 @@
|
||||
package telemetrytypes
|
||||
|
||||
type JSONDataTypeIndex struct {
|
||||
Type JSONDataType
|
||||
ColumnExpression string
|
||||
IndexExpression string
|
||||
}
|
||||
|
||||
type JSONDataType struct {
|
||||
str string // Store the correct case for ClickHouse
|
||||
IsArray bool
|
||||
@@ -32,18 +26,17 @@ var (
|
||||
ArrayJSON = JSONDataType{"Array(JSON)", true, "JSON", false}
|
||||
)
|
||||
|
||||
var MappingStringToJSONDataType = map[string]JSONDataType{
|
||||
"String": String,
|
||||
"Int64": Int64,
|
||||
"Float64": Float64,
|
||||
"Bool": Bool,
|
||||
"Dynamic": Dynamic,
|
||||
"Array(Nullable(String))": ArrayString,
|
||||
"Array(Nullable(Int64))": ArrayInt64,
|
||||
"Array(Nullable(Float64))": ArrayFloat64,
|
||||
"Array(Nullable(Bool))": ArrayBool,
|
||||
"Array(Dynamic)": ArrayDynamic,
|
||||
"Array(JSON)": ArrayJSON,
|
||||
var MappingJSONDataTypeToFieldDataType = map[string]FieldDataType{
|
||||
"String": FieldDataTypeString,
|
||||
"Int64": FieldDataTypeInt64,
|
||||
"Float64": FieldDataTypeFloat64,
|
||||
"Bool": FieldDataTypeBool,
|
||||
"Array(Nullable(String))": FieldDataTypeArrayString,
|
||||
"Array(Nullable(Int64))": FieldDataTypeArrayInt64,
|
||||
"Array(Nullable(Float64))": FieldDataTypeArrayFloat64,
|
||||
"Array(Nullable(Bool))": FieldDataTypeArrayBool,
|
||||
"Array(Dynamic)": FieldDataTypeArrayDynamic,
|
||||
"Array(JSON)": FieldDataTypeArrayJSON,
|
||||
}
|
||||
|
||||
var MappingFieldDataTypeToJSONDataType = map[FieldDataType]JSONDataType{
|
||||
|
||||
@@ -3,7 +3,6 @@ package telemetrytypes
|
||||
import (
|
||||
"context"
|
||||
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
)
|
||||
|
||||
@@ -35,7 +34,7 @@ type MetadataStore interface {
|
||||
FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error)
|
||||
|
||||
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
|
||||
ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error)
|
||||
ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]TelemetryFieldKeySkipIndex, error)
|
||||
|
||||
// ListPromotedPaths lists the promoted paths.
|
||||
GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error)
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
@@ -18,7 +17,7 @@ type MockMetadataStore struct {
|
||||
TemporalityMap map[string]metrictypes.Temporality
|
||||
TypeMap map[string]metrictypes.Type
|
||||
PromotedPathsMap map[string]bool
|
||||
LogsJSONIndexesMap map[string][]schemamigrator.Index
|
||||
LogsJSONIndexes []telemetrytypes.TelemetryFieldKeySkipIndex
|
||||
ColumnEvolutionMetadataMap map[string][]*telemetrytypes.EvolutionEntry
|
||||
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
|
||||
// StaticFields holds signal-specific intrinsic field definitions (e.g. telemetrylogs.IntrinsicFields).
|
||||
@@ -34,7 +33,7 @@ func NewMockMetadataStore() *MockMetadataStore {
|
||||
TemporalityMap: make(map[string]metrictypes.Temporality),
|
||||
TypeMap: make(map[string]metrictypes.Type),
|
||||
PromotedPathsMap: make(map[string]bool),
|
||||
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
|
||||
LogsJSONIndexes: []telemetrytypes.TelemetryFieldKeySkipIndex{},
|
||||
ColumnEvolutionMetadataMap: make(map[string][]*telemetrytypes.EvolutionEntry),
|
||||
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
StaticFields: make(map[string]telemetrytypes.TelemetryFieldKey),
|
||||
@@ -369,8 +368,8 @@ func (m *MockMetadataStore) GetPromotedPaths(ctx context.Context, paths ...strin
|
||||
}
|
||||
|
||||
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
|
||||
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
|
||||
return m.LogsJSONIndexesMap, nil
|
||||
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
|
||||
return m.LogsJSONIndexes, nil
|
||||
}
|
||||
|
||||
func (m *MockMetadataStore) updateColumnEvolutionMetadataForKeys(_ context.Context, keysToUpdate []*telemetrytypes.TelemetryFieldKey) map[string][]*telemetrytypes.EvolutionEntry {
|
||||
|
||||
@@ -23,6 +23,7 @@ pytest_plugins = [
|
||||
"fixtures.notification_channel",
|
||||
"fixtures.alerts",
|
||||
"fixtures.cloudintegrations",
|
||||
"fixtures.jsontypeexporter",
|
||||
]
|
||||
|
||||
|
||||
@@ -78,6 +79,6 @@ def pytest_addoption(parser: pytest.Parser):
|
||||
parser.addoption(
|
||||
"--schema-migrator-version",
|
||||
action="store",
|
||||
default="v0.144.2",
|
||||
default="v0.144.3",
|
||||
help="schema migrator version",
|
||||
)
|
||||
|
||||
473
tests/integration/fixtures/jsontypeexporter.py
Normal file
473
tests/integration/fixtures/jsontypeexporter.py
Normal file
@@ -0,0 +1,473 @@
|
||||
"""
|
||||
Simpler version of jsontypeexporter for test fixtures.
|
||||
This exports JSON type metadata to the path_types table by parsing JSON bodies
|
||||
and extracting all paths with their types, similar to how the real jsontypeexporter works.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import json
|
||||
from abc import ABC
|
||||
from http import HTTPStatus
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Generator,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Union,
|
||||
)
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
|
||||
|
||||
class JSONPathType(ABC):
|
||||
"""Represents a JSON path with its type information"""
|
||||
|
||||
field_name: str
|
||||
field_data_type: str
|
||||
last_seen: np.uint64
|
||||
signal: str = "logs"
|
||||
field_context: str = "body"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
field_name: str,
|
||||
field_data_type: str,
|
||||
last_seen: Optional[datetime.datetime] = None,
|
||||
) -> None:
|
||||
self.field_name = field_name
|
||||
self.field_data_type = field_data_type
|
||||
self.signal = "logs"
|
||||
self.field_context = "body"
|
||||
if last_seen is None:
|
||||
last_seen = datetime.datetime.now()
|
||||
self.last_seen = np.uint64(int(last_seen.timestamp() * 1e9))
|
||||
|
||||
def np_arr(self) -> np.array:
|
||||
"""Return path type data as numpy array for database insertion"""
|
||||
return np.array([self.signal, self.field_context, self.field_name, self.field_data_type, self.last_seen])
|
||||
|
||||
|
||||
# Constants matching jsontypeexporter
|
||||
ARRAY_SEPARATOR = "[]." # Used in paths like "education[].name"
|
||||
ARRAY_SUFFIX = "[]" # Used when traversing into array element objects
|
||||
|
||||
|
||||
def _infer_array_type_from_type_strings(types: List[str]) -> Optional[str]:
|
||||
"""
|
||||
Infer array type from a list of pre-classified type strings.
|
||||
Matches metadataexporter's inferArrayMask logic.
|
||||
|
||||
Internal type strings are: "JSON", "String", "Bool", "Float64", "Int64"
|
||||
|
||||
SuperTyping rules (matching Go inferArrayMask):
|
||||
- JSON alone → []json
|
||||
- JSON + any primitive → []dynamic
|
||||
- String alone → []string; String + other → []dynamic
|
||||
- Float64 wins over Int64 and Bool
|
||||
- Int64 wins over Bool
|
||||
- Bool alone → []bool
|
||||
"""
|
||||
if len(types) == 0:
|
||||
return None
|
||||
|
||||
unique = set(types)
|
||||
|
||||
has_json = "JSON" in unique
|
||||
# hasPrimitive mirrors Go: (hasJSON && len(unique) > 1) || (!hasJSON && len(unique) > 0)
|
||||
has_primitive = (has_json and len(unique) > 1) or (not has_json and len(unique) > 0)
|
||||
|
||||
if has_json:
|
||||
if not has_primitive:
|
||||
return "[]json"
|
||||
return "[]dynamic"
|
||||
|
||||
# ---- Primitive Type Resolution (Float > Int > Bool) ----
|
||||
if "String" in unique:
|
||||
if len(unique) > 1:
|
||||
return "[]dynamic"
|
||||
return "[]string"
|
||||
|
||||
if "Float64" in unique:
|
||||
return "[]float64"
|
||||
if "Int64" in unique:
|
||||
return "[]int64"
|
||||
if "Bool" in unique:
|
||||
return "[]bool"
|
||||
|
||||
return "[]dynamic"
|
||||
|
||||
|
||||
def _infer_array_type(elements: List[Any]) -> Optional[str]:
|
||||
"""
|
||||
Infer array type from raw Python list elements.
|
||||
Classifies each element then delegates to _infer_array_type_from_type_strings.
|
||||
"""
|
||||
if len(elements) == 0:
|
||||
return None
|
||||
|
||||
types = []
|
||||
for elem in elements:
|
||||
if elem is None:
|
||||
continue
|
||||
if isinstance(elem, dict):
|
||||
types.append("JSON")
|
||||
elif isinstance(elem, str):
|
||||
types.append("String")
|
||||
elif isinstance(elem, bool): # must be before int (bool is subclass of int)
|
||||
types.append("Bool")
|
||||
elif isinstance(elem, float):
|
||||
types.append("Float64")
|
||||
elif isinstance(elem, int):
|
||||
types.append("Int64")
|
||||
|
||||
return _infer_array_type_from_type_strings(types)
|
||||
|
||||
|
||||
def _python_type_to_clickhouse_type(value: Any) -> str:
|
||||
"""
|
||||
Convert Python type to ClickHouse JSON type string.
|
||||
Maps Python types to ClickHouse JSON data types.
|
||||
Matches metadataexporter's mapPCommonValueTypeToDataType.
|
||||
"""
|
||||
if isinstance(value, bool):
|
||||
return "bool"
|
||||
elif isinstance(value, int):
|
||||
return "int64"
|
||||
elif isinstance(value, float):
|
||||
return "float64"
|
||||
elif isinstance(value, str):
|
||||
return "string"
|
||||
elif isinstance(value, list):
|
||||
# Use the sophisticated array type inference
|
||||
array_type = _infer_array_type(value)
|
||||
return array_type if array_type else "[]dynamic"
|
||||
elif isinstance(value, dict):
|
||||
return "json"
|
||||
else:
|
||||
return "string" # Default fallback
|
||||
|
||||
|
||||
def _extract_json_paths(
|
||||
obj: Any,
|
||||
current_path: str = "",
|
||||
path_types: Optional[Dict[str, Set[str]]] = None,
|
||||
level: int = 0,
|
||||
) -> Dict[str, Set[str]]:
|
||||
"""
|
||||
Recursively extract all paths and their types from a JSON object.
|
||||
Matches jsontypeexporter's analyzePValue logic.
|
||||
|
||||
Args:
|
||||
obj: The JSON object to traverse
|
||||
current_path: Current path being built (e.g., "user.name")
|
||||
path_types: Dictionary mapping paths to sets of types found
|
||||
level: Current nesting level (for depth limiting)
|
||||
|
||||
Returns:
|
||||
Dictionary mapping paths to sets of type strings
|
||||
"""
|
||||
if path_types is None:
|
||||
path_types = {}
|
||||
|
||||
if obj is None:
|
||||
# Skip null values — matches Go walkNode which errors on ValueTypeEmpty
|
||||
return path_types
|
||||
|
||||
if isinstance(obj, dict):
|
||||
# For objects, recurse into keys without recording the object itself as a type.
|
||||
# Matches Go walkMap which recurses without calling ta.record on the map node.
|
||||
|
||||
for key, value in obj.items():
|
||||
# Build the path for this key
|
||||
if current_path:
|
||||
new_path = f"{current_path}.{key}"
|
||||
else:
|
||||
new_path = key
|
||||
|
||||
# Recurse into the value
|
||||
_extract_json_paths(value, new_path, path_types, level + 1)
|
||||
|
||||
elif isinstance(obj, list):
|
||||
# Skip empty arrays
|
||||
if len(obj) == 0:
|
||||
return path_types
|
||||
|
||||
# Collect types from array elements (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
|
||||
types = []
|
||||
|
||||
for item in obj:
|
||||
if isinstance(item, dict):
|
||||
# When traversing into array element objects, use ArraySuffix ([])
|
||||
# This matches: prefix+ArraySuffix in the Go code
|
||||
# Example: if current_path is "education", we use "education[]" to traverse into objects
|
||||
array_prefix = current_path + ARRAY_SUFFIX if current_path else ""
|
||||
for key, value in item.items():
|
||||
if array_prefix:
|
||||
# Use array separator: education[].name
|
||||
array_path = f"{array_prefix}.{key}"
|
||||
else:
|
||||
array_path = key
|
||||
# Recurse without increasing level (matching Go behavior)
|
||||
_extract_json_paths(value, array_path, path_types, level)
|
||||
types.append("JSON")
|
||||
elif isinstance(item, list):
|
||||
# Arrays inside arrays are not supported - skip the whole path
|
||||
# Matching Go: e.logger.Error("arrays inside arrays are not supported!", ...); return nil
|
||||
return path_types
|
||||
elif isinstance(item, str):
|
||||
types.append("String")
|
||||
elif isinstance(item, bool):
|
||||
types.append("Bool")
|
||||
elif isinstance(item, float):
|
||||
types.append("Float64")
|
||||
elif isinstance(item, int):
|
||||
types.append("Int64")
|
||||
|
||||
# Infer array type from collected types (matching Go: if mask := inferArrayMask(types); mask != 0)
|
||||
if len(types) > 0:
|
||||
array_type = _infer_array_type_from_type_strings(types)
|
||||
if array_type and current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
path_types[current_path].add(array_type)
|
||||
|
||||
else:
|
||||
# Primitive value (string, number, bool)
|
||||
if current_path:
|
||||
if current_path not in path_types:
|
||||
path_types[current_path] = set()
|
||||
obj_type = _python_type_to_clickhouse_type(obj)
|
||||
path_types[current_path].add(obj_type)
|
||||
|
||||
return path_types
|
||||
|
||||
|
||||
def _parse_json_bodies_and_extract_paths(
|
||||
json_bodies: List[str],
|
||||
timestamp: Optional[datetime.datetime] = None,
|
||||
) -> List[JSONPathType]:
|
||||
"""
|
||||
Parse JSON bodies and extract all paths with their types.
|
||||
This mimics the behavior of jsontypeexporter.
|
||||
|
||||
Args:
|
||||
json_bodies: List of JSON body strings to parse
|
||||
timestamp: Timestamp to use for last_seen (defaults to now)
|
||||
|
||||
Returns:
|
||||
List of JSONPathType objects with all discovered paths and types
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.datetime.now()
|
||||
|
||||
# Aggregate all paths and their types across all JSON bodies
|
||||
all_path_types: Dict[str, Set[str]] = {}
|
||||
|
||||
for json_body in json_bodies:
|
||||
try:
|
||||
parsed = json.loads(json_body)
|
||||
_extract_json_paths(parsed, "", all_path_types, level=0)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# Skip invalid JSON
|
||||
continue
|
||||
|
||||
# Convert to list of JSONPathType objects
|
||||
# Each path can have multiple types, so we create one JSONPathType per type
|
||||
path_type_objects: List[JSONPathType] = []
|
||||
for path, types_set in all_path_types.items():
|
||||
for type_str in types_set:
|
||||
path_type_objects.append(
|
||||
JSONPathType(field_name=path, field_data_type=type_str, last_seen=timestamp)
|
||||
)
|
||||
|
||||
return path_type_objects
|
||||
|
||||
|
||||
@pytest.fixture(name="export_json_types", scope="function")
|
||||
def export_json_types(
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest, # To access migrator fixture
|
||||
) -> Generator[
|
||||
Callable[[Union[List[JSONPathType], List[str], List[Any]]], None], Any, None
|
||||
]:
|
||||
"""
|
||||
Fixture for exporting JSON type metadata to the path_types table.
|
||||
This is a simpler version of jsontypeexporter for test fixtures.
|
||||
|
||||
The function can accept:
|
||||
1. List of JSONPathType objects (manual specification)
|
||||
2. List of JSON body strings (auto-extract paths)
|
||||
3. List of Logs objects (extract from body_json field)
|
||||
|
||||
Usage examples:
|
||||
# Manual specification
|
||||
export_json_types([
|
||||
JSONPathType(field_name="user.name", field_data_type="string"),
|
||||
JSONPathType(field_name="user.age", field_data_type="int64"),
|
||||
])
|
||||
|
||||
# Auto-extract from JSON strings
|
||||
export_json_types([
|
||||
'{"user": {"name": "alice", "age": 25}}',
|
||||
'{"user": {"name": "bob", "age": 30}}',
|
||||
])
|
||||
|
||||
# Auto-extract from Logs objects
|
||||
export_json_types(logs_list)
|
||||
"""
|
||||
# Ensure migrator has run to create the table
|
||||
try:
|
||||
request.getfixturevalue("migrator")
|
||||
except Exception:
|
||||
# If migrator fixture is not available, that's okay - table might already exist
|
||||
pass
|
||||
|
||||
def _export_json_types(
|
||||
data: Union[
|
||||
List[JSONPathType], List[str], List[Any]
|
||||
], # List[Logs] but avoiding circular import
|
||||
) -> None:
|
||||
"""
|
||||
Export JSON type metadata to signoz_metadata.distributed_field_keys table.
|
||||
This table stores signal, context, path, and type information for body JSON fields.
|
||||
"""
|
||||
path_types: List[JSONPathType] = []
|
||||
|
||||
if len(data) == 0:
|
||||
return
|
||||
|
||||
# Determine input type and convert to JSONPathType list
|
||||
first_item = data[0]
|
||||
|
||||
if isinstance(first_item, JSONPathType):
|
||||
# Already JSONPathType objects
|
||||
path_types = data # type: ignore
|
||||
elif isinstance(first_item, str):
|
||||
# List of JSON strings - parse and extract paths
|
||||
path_types = _parse_json_bodies_and_extract_paths(data) # type: ignore
|
||||
else:
|
||||
# Assume it's a list of Logs objects - extract body_v2
|
||||
json_bodies: List[str] = []
|
||||
for log in data: # type: ignore
|
||||
# Try to get body_v2 attribute
|
||||
if hasattr(log, "body_v2") and log.body_v2:
|
||||
json_bodies.append(log.body_v2)
|
||||
elif hasattr(log, "body") and log.body:
|
||||
# Fallback to body if body_v2 not available
|
||||
try:
|
||||
# Try to parse as JSON
|
||||
json.loads(log.body)
|
||||
json_bodies.append(log.body)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
|
||||
if json_bodies:
|
||||
path_types = _parse_json_bodies_and_extract_paths(json_bodies)
|
||||
|
||||
if len(path_types) == 0:
|
||||
return
|
||||
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_metadata",
|
||||
table="distributed_field_keys",
|
||||
data=[path_type.np_arr() for path_type in path_types],
|
||||
column_names=[
|
||||
"signal",
|
||||
"field_context",
|
||||
"field_name",
|
||||
"field_data_type",
|
||||
"last_seen",
|
||||
],
|
||||
)
|
||||
|
||||
yield _export_json_types
|
||||
|
||||
# Cleanup - truncate the local table after tests (following pattern from logs fixture)
|
||||
clickhouse.conn.query(
|
||||
f"TRUNCATE TABLE signoz_metadata.field_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
||||
)
|
||||
|
||||
@pytest.fixture(name="create_json_index", scope="function")
|
||||
def create_json_index(
|
||||
signoz: types.SigNoz,
|
||||
) -> Generator[Callable[[str, List[Dict[str, Any]]], None], None, None]:
|
||||
"""
|
||||
Create ClickHouse data-skipping indexes on body_v2 JSON sub-columns via
|
||||
POST /api/v1/logs/promote_paths.
|
||||
|
||||
**Must be called BEFORE insert_logs** so that newly inserted data parts are
|
||||
covered by the index and the QB uses the indexed condition path.
|
||||
|
||||
Each entry in `paths` follows the PromotePath API shape:
|
||||
{
|
||||
"path": "body.user.name", # must start with "body."
|
||||
"indexes": [
|
||||
{
|
||||
"column_type": "String", # String | Int64 | Float64
|
||||
"type": "ngrambf_v1(3, 256, 2, 0)", # or "minmax", "tokenbf_v1(...)"
|
||||
"granularity": 1,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
Teardown drops every index created during the test by querying
|
||||
system.data_skipping_indices for matching expressions.
|
||||
|
||||
Example::
|
||||
|
||||
def test_foo(signoz, get_token, insert_logs, export_json_types, create_json_body_index):
|
||||
token = get_token(...)
|
||||
export_json_types(logs_list)
|
||||
create_json_body_index(token, [
|
||||
{"path": "body.user.name",
|
||||
"indexes": [{"column_type": "String", "type": "ngrambf_v1(3, 256, 2, 0)", "granularity": 1}]},
|
||||
{"path": "body.user.age",
|
||||
"indexes": [{"column_type": "Int64", "type": "minmax", "granularity": 1}]},
|
||||
])
|
||||
insert_logs(logs_list) # data inserted after index exists — index is built automatically
|
||||
"""
|
||||
created_paths: List[str] = []
|
||||
|
||||
def _create_json_body_index(token: str, paths: List[Dict[str, Any]]) -> None:
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/logs/promote_paths"),
|
||||
headers={"authorization": f"Bearer {token}"},
|
||||
json=paths,
|
||||
timeout=30,
|
||||
)
|
||||
assert response.status_code == HTTPStatus.CREATED, (
|
||||
f"Failed to create JSON body indexes: "
|
||||
f"{response.status_code} {response.text}"
|
||||
)
|
||||
for path in paths:
|
||||
# The API strips the "body." prefix before storing — mirror that here
|
||||
# so our cleanup query uses the bare path (e.g. "user.name").
|
||||
raw = path["path"].removeprefix("body.")
|
||||
if raw not in created_paths:
|
||||
created_paths.append(raw)
|
||||
|
||||
yield _create_json_body_index
|
||||
|
||||
if not created_paths:
|
||||
return
|
||||
|
||||
cluster = signoz.telemetrystore.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER"]
|
||||
for path in created_paths:
|
||||
result = signoz.telemetrystore.conn.query(
|
||||
"SELECT name FROM system.data_skipping_indices "
|
||||
"WHERE database = 'signoz_logs' AND table = 'logs_v2' "
|
||||
f"AND expr LIKE '%{path}%'"
|
||||
)
|
||||
for (index_name,) in result.result_rows:
|
||||
signoz.telemetrystore.conn.query(
|
||||
f"ALTER TABLE signoz_logs.logs_v2 "
|
||||
f"ON CLUSTER '{cluster}' "
|
||||
f"DROP INDEX IF EXISTS `{index_name}`"
|
||||
)
|
||||
@@ -122,6 +122,8 @@ class Logs(ABC):
|
||||
resources: dict[str, Any] = {},
|
||||
attributes: dict[str, Any] = {},
|
||||
body: str = "default body",
|
||||
body_v2: Optional[str] = None,
|
||||
body_promoted: Optional[str] = None,
|
||||
severity_text: str = "INFO",
|
||||
trace_id: str = "",
|
||||
span_id: str = "",
|
||||
@@ -167,6 +169,33 @@ class Logs(ABC):
|
||||
# Set body
|
||||
self.body = body
|
||||
|
||||
# Set body_v2 - if body is JSON, parse and stringify it, otherwise use empty string
|
||||
# ClickHouse accepts String input for JSON column
|
||||
if body_v2 is not None:
|
||||
self.body_v2 = body_v2
|
||||
else:
|
||||
# Try to parse body as JSON; if successful use it directly,
|
||||
# otherwise wrap as {"message": body} matching the normalize operator behavior.
|
||||
try:
|
||||
json.loads(body)
|
||||
self.body_v2 = body
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
self.body_v2 = json.dumps({"message": body})
|
||||
|
||||
# Set body_promoted - must be valid JSON
|
||||
# Tests will explicitly pass promoted column's content, but we validate it
|
||||
if body_promoted is not None:
|
||||
# Validate that it's valid JSON
|
||||
try:
|
||||
json.loads(body_promoted)
|
||||
self.body_promoted = body_promoted
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# If invalid, default to empty JSON object
|
||||
self.body_promoted = "{}"
|
||||
else:
|
||||
# Default to empty JSON object (valid JSON)
|
||||
self.body_promoted = "{}"
|
||||
|
||||
# Process resources and attributes
|
||||
self.resources_string = {k: str(v) for k, v in resources.items()}
|
||||
self.resource_json = (
|
||||
@@ -326,6 +355,8 @@ class Logs(ABC):
|
||||
self.severity_text,
|
||||
self.severity_number,
|
||||
self.body,
|
||||
self.body_v2,
|
||||
self.body_promoted,
|
||||
self.attributes_string,
|
||||
self.attributes_number,
|
||||
self.attributes_bool,
|
||||
@@ -454,31 +485,53 @@ def insert_logs(
|
||||
data=[resource_key.np_arr() for resource_key in resource_keys],
|
||||
)
|
||||
|
||||
# All columns in insertion order (must match Logs.np_arr() order)
|
||||
all_column_names = [
|
||||
"ts_bucket_start",
|
||||
"resource_fingerprint",
|
||||
"timestamp",
|
||||
"observed_timestamp",
|
||||
"id",
|
||||
"trace_id",
|
||||
"span_id",
|
||||
"trace_flags",
|
||||
"severity_text",
|
||||
"severity_number",
|
||||
"body",
|
||||
"body_v2",
|
||||
"body_promoted",
|
||||
"attributes_string",
|
||||
"attributes_number",
|
||||
"attributes_bool",
|
||||
"resources_string",
|
||||
"scope_name",
|
||||
"scope_version",
|
||||
"scope_string",
|
||||
"resource",
|
||||
]
|
||||
|
||||
# Check if body_v2 column exists (only present when ENABLE_LOGS_MIGRATIONS_V2 migration has run)
|
||||
result = clickhouse.conn.query(
|
||||
"SELECT count() FROM system.columns WHERE database = 'signoz_logs' AND table = 'logs_v2' AND name = 'body_v2'"
|
||||
)
|
||||
has_json_body = result.result_rows[0][0] > 0
|
||||
|
||||
if has_json_body:
|
||||
column_names = all_column_names
|
||||
data = [log.np_arr() for log in logs]
|
||||
else:
|
||||
json_body_cols = {"body_v2", "body_promoted"}
|
||||
keep_indices = [
|
||||
i for i, c in enumerate(all_column_names) if c not in json_body_cols
|
||||
]
|
||||
column_names = [all_column_names[i] for i in keep_indices]
|
||||
data = [log.np_arr()[keep_indices] for log in logs]
|
||||
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_logs",
|
||||
table="distributed_logs_v2",
|
||||
data=[log.np_arr() for log in logs],
|
||||
column_names=[
|
||||
"ts_bucket_start",
|
||||
"resource_fingerprint",
|
||||
"timestamp",
|
||||
"observed_timestamp",
|
||||
"id",
|
||||
"trace_id",
|
||||
"span_id",
|
||||
"trace_flags",
|
||||
"severity_text",
|
||||
"severity_number",
|
||||
"body",
|
||||
"attributes_string",
|
||||
"attributes_number",
|
||||
"attributes_bool",
|
||||
"resources_string",
|
||||
"scope_name",
|
||||
"scope_version",
|
||||
"scope_string",
|
||||
"resource",
|
||||
],
|
||||
data=data,
|
||||
column_names=column_names,
|
||||
)
|
||||
|
||||
yield _insert_logs
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from typing import Optional
|
||||
|
||||
import docker
|
||||
import pytest
|
||||
from testcontainers.core.container import Network
|
||||
@@ -8,27 +10,32 @@ from fixtures.logger import setup_logger
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture(name="migrator", scope="package")
|
||||
def migrator(
|
||||
def create_migrator(
|
||||
network: Network,
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest,
|
||||
pytestconfig: pytest.Config,
|
||||
cache_key: str = "migrator",
|
||||
env_overrides: Optional[dict] = None,
|
||||
) -> types.Operation:
|
||||
"""
|
||||
Package-scoped fixture for running schema migrations.
|
||||
Factory function for running schema migrations.
|
||||
Accepts optional env_overrides to customize the migrator environment.
|
||||
"""
|
||||
|
||||
def create() -> None:
|
||||
version = request.config.getoption("--schema-migrator-version")
|
||||
client = docker.from_env()
|
||||
|
||||
environment = dict(env_overrides) if env_overrides else {}
|
||||
|
||||
container = client.containers.run(
|
||||
image=f"signoz/signoz-schema-migrator:{version}",
|
||||
command=f"sync --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN"]}",
|
||||
detach=True,
|
||||
auto_remove=False,
|
||||
network=network.id,
|
||||
environment=environment,
|
||||
)
|
||||
|
||||
result = container.wait()
|
||||
@@ -47,6 +54,7 @@ def migrator(
|
||||
detach=True,
|
||||
auto_remove=False,
|
||||
network=network.id,
|
||||
environment=environment,
|
||||
)
|
||||
|
||||
result = container.wait()
|
||||
@@ -59,7 +67,7 @@ def migrator(
|
||||
|
||||
container.remove()
|
||||
|
||||
return types.Operation(name="migrator")
|
||||
return types.Operation(name=cache_key)
|
||||
|
||||
def delete(_: types.Operation) -> None:
|
||||
pass
|
||||
@@ -70,9 +78,27 @@ def migrator(
|
||||
return dev.wrap(
|
||||
request,
|
||||
pytestconfig,
|
||||
"migrator",
|
||||
cache_key,
|
||||
lambda: types.Operation(name=""),
|
||||
create,
|
||||
delete,
|
||||
restore,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="migrator", scope="package")
|
||||
def migrator(
|
||||
network: Network,
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest,
|
||||
pytestconfig: pytest.Config,
|
||||
) -> types.Operation:
|
||||
"""
|
||||
Package-scoped fixture for running schema migrations.
|
||||
"""
|
||||
return create_migrator(
|
||||
network=network,
|
||||
clickhouse=clickhouse,
|
||||
request=request,
|
||||
pytestconfig=pytestconfig,
|
||||
)
|
||||
|
||||
1691
tests/integration/src/querier_json_body/01_logs_json_body_new_qb.py
Normal file
1691
tests/integration/src/querier_json_body/01_logs_json_body_new_qb.py
Normal file
File diff suppressed because it is too large
Load Diff
0
tests/integration/src/querier_json_body/__init__.py
Normal file
0
tests/integration/src/querier_json_body/__init__.py
Normal file
70
tests/integration/src/querier_json_body/conftest.py
Normal file
70
tests/integration/src/querier_json_body/conftest.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import pytest
|
||||
from testcontainers.core.container import Network
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.migrator import create_migrator
|
||||
from fixtures.signoz import create_signoz
|
||||
|
||||
UNSUPPORTED_CLICKHOUSE_VERSIONS = {"25.5.6"}
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(
|
||||
config: pytest.Config, items: list[pytest.Item]
|
||||
) -> None:
|
||||
version = config.getoption("--clickhouse-version")
|
||||
if version in UNSUPPORTED_CLICKHOUSE_VERSIONS:
|
||||
skip = pytest.mark.skip(
|
||||
reason=f"JSON body QB tests require ClickHouse > {version}"
|
||||
)
|
||||
for item in items:
|
||||
item.add_marker(skip)
|
||||
|
||||
|
||||
@pytest.fixture(name="migrator", scope="package")
|
||||
def migrator_json(
|
||||
network: Network,
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest,
|
||||
pytestconfig: pytest.Config,
|
||||
) -> types.Operation:
|
||||
"""
|
||||
Package-scoped migrator with ENABLE_LOGS_MIGRATIONS_V2=1.
|
||||
"""
|
||||
return create_migrator(
|
||||
network=network,
|
||||
clickhouse=clickhouse,
|
||||
request=request,
|
||||
pytestconfig=pytestconfig,
|
||||
cache_key="migrator-json-body",
|
||||
env_overrides={
|
||||
"ENABLE_LOGS_MIGRATIONS_V2": "1",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="signoz", scope="package")
|
||||
def signoz_json_body(
|
||||
network: Network,
|
||||
zeus: types.TestContainerDocker,
|
||||
gateway: types.TestContainerDocker,
|
||||
sqlstore: types.TestContainerSQL,
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
request: pytest.FixtureRequest,
|
||||
pytestconfig: pytest.Config,
|
||||
) -> types.SigNoz:
|
||||
"""
|
||||
Package-scoped fixture for SigNoz with BODY_JSON_QUERY_ENABLED=true.
|
||||
"""
|
||||
return create_signoz(
|
||||
network=network,
|
||||
zeus=zeus,
|
||||
gateway=gateway,
|
||||
sqlstore=sqlstore,
|
||||
clickhouse=clickhouse,
|
||||
request=request,
|
||||
pytestconfig=pytestconfig,
|
||||
cache_key="signoz-json-body",
|
||||
env_overrides={
|
||||
"BODY_JSON_QUERY_ENABLED": "true",
|
||||
},
|
||||
)
|
||||
Reference in New Issue
Block a user