mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-31 17:40:25 +01:00
Compare commits
7 Commits
fix/array-
...
tvats-vali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34a79aa9b9 | ||
|
|
b9b90b9a2a | ||
|
|
699cb64949 | ||
|
|
3d5d4e4398 | ||
|
|
8cbeac9881 | ||
|
|
75f770655c | ||
|
|
b7f610e607 |
2
go.mod
2
go.mod
@@ -160,7 +160,7 @@ require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 // indirect
|
||||
github.com/ClickHouse/ch-go v0.67.0 // indirect
|
||||
github.com/ClickHouse/ch-go v0.67.0
|
||||
github.com/Masterminds/squirrel v1.5.4 // indirect
|
||||
github.com/Yiling-J/theine-go v0.6.2 // indirect
|
||||
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b // indirect
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
chproto "github.com/ClickHouse/ch-go/proto"
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
@@ -54,6 +55,36 @@ func newchSQLQuery(
|
||||
}
|
||||
}
|
||||
|
||||
// userFacingCHCodes are ClickHouse error codes that indicate a problem with the
|
||||
// query itself (bad SQL, unknown table/column, etc.) rather than a server-side
|
||||
// failure. This list is incomplete and should be expanded as we discover more error codes that should map to HTTP 400 instead of 500.
|
||||
// It is a subset of the error codes that are known to be user-facing.
|
||||
var userFacingCHCodes = map[chproto.Error]bool{
|
||||
chproto.ErrSyntaxError: true,
|
||||
chproto.ErrUnknownTable: true,
|
||||
chproto.ErrUnknownDatabase: true,
|
||||
chproto.ErrUnknownIdentifier: true,
|
||||
chproto.ErrUnknownFunction: true,
|
||||
chproto.ErrUnknownAggregateFunction: true,
|
||||
chproto.ErrUnknownType: true,
|
||||
chproto.ErrUnknownStorage: true,
|
||||
chproto.ErrUnknownElementInAst: true,
|
||||
chproto.ErrUnknownTypeOfQuery: true,
|
||||
chproto.ErrIllegalTypeOfArgument: true,
|
||||
chproto.ErrIllegalColumn: true,
|
||||
chproto.ErrNumberOfArgumentsDoesntMatch: true,
|
||||
chproto.ErrTooManyArgumentsForFunction: true,
|
||||
chproto.ErrTooLessArgumentsForFunction: true,
|
||||
}
|
||||
|
||||
func mapClickHouseError(err error) error {
|
||||
var ex *clickhouse.Exception
|
||||
if errors.As(err, &ex) && userFacingCHCodes[chproto.Error(ex.Code)] {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "%s", ex.Message)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *chSQLQuery) Fingerprint() string {
|
||||
// No caching for CH queries for now
|
||||
return ""
|
||||
@@ -121,11 +152,10 @@ func (q *chSQLQuery) Execute(ctx context.Context) (*qbtypes.Result, error) {
|
||||
|
||||
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, query, q.args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, mapClickHouseError(err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// TODO: map the errors from ClickHouse to our error types
|
||||
payload, err := consume(rows, q.kind, nil, qbtypes.Step{}, q.query.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -258,6 +258,11 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
event.LogsUsed = strings.Contains(spec.Query, "signoz_logs")
|
||||
event.TracesUsed = strings.Contains(spec.Query, "signoz_traces")
|
||||
}
|
||||
if w := spec.LocalTableUsageWarning(); w != "" {
|
||||
// TODO: remove this if we have too much log volume from this
|
||||
q.logger.WarnContext(ctx, "clickhouse query references local tables", slog.String("warning", w))
|
||||
intervalWarnings = append(intervalWarnings, w)
|
||||
}
|
||||
}
|
||||
case qbtypes.QueryTypeTraceOperator:
|
||||
if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
|
||||
@@ -430,15 +435,15 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event)
|
||||
if qbResp != nil {
|
||||
qbResp.QBEvent = event
|
||||
if len(intervalWarnings) != 0 && req.RequestType == qbtypes.RequestTypeTimeSeries {
|
||||
if len(intervalWarnings) != 0 {
|
||||
if qbResp.Warning == nil {
|
||||
qbResp.Warning = &qbtypes.QueryWarnData{
|
||||
Warnings: make([]qbtypes.QueryWarnDataAdditional, len(intervalWarnings)),
|
||||
}
|
||||
for idx := range intervalWarnings {
|
||||
qbResp.Warning.Warnings[idx] = qbtypes.QueryWarnDataAdditional{Message: intervalWarnings[idx]}
|
||||
Warnings: make([]qbtypes.QueryWarnDataAdditional, 0, len(intervalWarnings)),
|
||||
}
|
||||
}
|
||||
for _, w := range intervalWarnings {
|
||||
qbResp.Warning.Warnings = append(qbResp.Warning.Warnings, qbtypes.QueryWarnDataAdditional{Message: w})
|
||||
}
|
||||
}
|
||||
}
|
||||
return qbResp, qbErr
|
||||
|
||||
@@ -1,5 +1,97 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
type chTableCheck struct {
|
||||
pattern *regexp.Regexp
|
||||
errMsg string
|
||||
}
|
||||
|
||||
func buildDeprecatedChecks(entries []struct{ name, replacement string }) []chTableCheck {
|
||||
result := make([]chTableCheck, len(entries))
|
||||
for i, e := range entries {
|
||||
var msg string
|
||||
if e.replacement != "" {
|
||||
msg = fmt.Sprintf("table %q is deprecated, use %q instead", e.name, e.replacement)
|
||||
} else {
|
||||
msg = fmt.Sprintf("table %q is deprecated", e.name)
|
||||
}
|
||||
result[i] = chTableCheck{
|
||||
pattern: regexp.MustCompile(`(?i)\b` + regexp.QuoteMeta(e.name) + `\b`),
|
||||
errMsg: msg,
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func buildLocalChecks(entries []struct{ name, replacement string }) []chTableCheck {
|
||||
result := make([]chTableCheck, len(entries))
|
||||
for i, e := range entries {
|
||||
result[i] = chTableCheck{
|
||||
pattern: regexp.MustCompile(`(?i)\b` + regexp.QuoteMeta(e.name) + `\b`),
|
||||
errMsg: fmt.Sprintf("ClickHouse query references local table %q, use distributed table %q instead", e.name, e.replacement),
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// chDeprecatedChecks contains checks for deprecated tables; matching queries
|
||||
// are rejected with an error. Word-boundary patterns prevent false positives
|
||||
// (e.g. "distributed_logs" must not match "distributed_logs_v2").
|
||||
var chDeprecatedChecks = buildDeprecatedChecks([]struct{ name, replacement string }{
|
||||
// Traces V2 → V3
|
||||
{"distributed_signoz_index_v2", "distributed_signoz_index_v3"},
|
||||
{"signoz_index_v2", "distributed_signoz_index_v3"},
|
||||
{"distributed_signoz_error_index_v2", "distributed_signoz_index_v3"},
|
||||
{"signoz_error_index_v2", "distributed_signoz_index_v3"},
|
||||
{"distributed_dependency_graph_minutes_v2", ""},
|
||||
{"dependency_graph_minutes_v2", ""},
|
||||
{"distributed_signoz_operations", "distributed_top_level_operations"},
|
||||
{"signoz_operations", "distributed_top_level_operations"},
|
||||
{"distributed_durationSort", "distributed_signoz_index_v3"},
|
||||
{"durationSort", "distributed_signoz_index_v3"},
|
||||
{"distributed_usage_explorer", ""},
|
||||
{"usage_explorer", ""},
|
||||
{"distributed_signoz_spans", ""},
|
||||
{"signoz_spans", ""},
|
||||
// Logs V1 → V2
|
||||
{"distributed_logs", "distributed_logs_v2"},
|
||||
{"logs", "distributed_logs_v2"},
|
||||
})
|
||||
|
||||
// chLocalChecks contains checks for local (non-distributed) tables; matching
|
||||
// queries produce a warning rather than an error.
|
||||
var chLocalChecks = buildLocalChecks([]struct{ name, replacement string }{
|
||||
// Traces
|
||||
{"signoz_index_v3", "distributed_signoz_index_v3"},
|
||||
{"tag_attributes_v2", "distributed_tag_attributes_v2"},
|
||||
// Logs
|
||||
{"logs_v2", "distributed_logs_v2"},
|
||||
{"logs_v2_resource", "distributed_logs_v2_resource"},
|
||||
// Metrics
|
||||
{"samples_v4", "distributed_samples_v4"},
|
||||
{"samples_v4_agg_5m", "distributed_samples_v4_agg_5m"},
|
||||
{"samples_v4_agg_30m", "distributed_samples_v4_agg_30m"},
|
||||
{"exp_hist", "distributed_exp_hist"},
|
||||
{"time_series_v4", "distributed_time_series_v4"},
|
||||
{"time_series_v4_6hrs", "distributed_time_series_v4_6hrs"},
|
||||
{"time_series_v4_1day", "distributed_time_series_v4_1day"},
|
||||
{"time_series_v4_1week", "distributed_time_series_v4_1week"},
|
||||
{"updated_metadata", "distributed_updated_metadata"},
|
||||
{"metadata", "distributed_metadata"},
|
||||
// Meter
|
||||
{"samples", "distributed_samples"},
|
||||
{"samples_agg_1d", "distributed_samples_agg_1d"},
|
||||
// Metadata
|
||||
{"attributes_metadata", "distributed_attributes_metadata"},
|
||||
})
|
||||
|
||||
type ClickHouseQuery struct {
|
||||
// name of the query
|
||||
Name string `json:"name"`
|
||||
@@ -15,3 +107,50 @@ type ClickHouseQuery struct {
|
||||
func (q ClickHouseQuery) Copy() ClickHouseQuery {
|
||||
return q
|
||||
}
|
||||
|
||||
// Validate performs basic validation on ClickHouseQuery.
|
||||
// It returns an error for deprecated tables
|
||||
func (q ClickHouseQuery) Validate() error {
|
||||
if q.Name == "" {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"name is required for ClickHouse query",
|
||||
)
|
||||
}
|
||||
|
||||
if q.Query == "" {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"ClickHouse SQL query is required",
|
||||
)
|
||||
}
|
||||
|
||||
trimmed := strings.TrimSpace(q.Query)
|
||||
|
||||
var msgs []string
|
||||
for _, check := range chDeprecatedChecks {
|
||||
if check.pattern.MatchString(trimmed) {
|
||||
msgs = append(msgs, check.errMsg)
|
||||
}
|
||||
}
|
||||
if len(msgs) > 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "ClickHouse query references deprecated tables").WithAdditional(msgs...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LocalTableWarnings returns warning messages for any local (non-distributed)
|
||||
// tables referenced in the query. Unlike deprecated tables, local tables are
|
||||
// not rejected outright — the query is still executed but the caller should
|
||||
// surface the warnings to the user.
|
||||
func (q ClickHouseQuery) LocalTableUsageWarning() string {
|
||||
trimmed := strings.TrimSpace(q.Query)
|
||||
var warnings []string
|
||||
for _, check := range chLocalChecks {
|
||||
if check.pattern.MatchString(trimmed) {
|
||||
warnings = append(warnings, check.errMsg)
|
||||
}
|
||||
}
|
||||
return strings.Join(warnings, "\n")
|
||||
}
|
||||
|
||||
@@ -0,0 +1,168 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
func TestClickHouseQuery_Copy(t *testing.T) {
|
||||
q := ClickHouseQuery{Name: "A", Query: "SELECT 1", Disabled: true, Legend: "my legend"}
|
||||
got := q.Copy()
|
||||
if got != q {
|
||||
t.Errorf("Copy() = %+v, want %+v", got, q)
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_Validate_DeprecatedTables covers every deprecated table entry,
|
||||
// verifying both rejection and the correct error message (with or without a replacement hint).
|
||||
func TestClickHouseQuery_Validate_DeprecatedTables(t *testing.T) {
|
||||
tests := []struct {
|
||||
table string
|
||||
query string
|
||||
wantErrMsg string // substring expected in error
|
||||
}{
|
||||
// Traces V2 → V3 (distributed)
|
||||
{
|
||||
"distributed_signoz_index_v2",
|
||||
"SELECT * FROM distributed_signoz_index_v2 LIMIT 10",
|
||||
`use "distributed_signoz_index_v3"`,
|
||||
},
|
||||
{
|
||||
"distributed_signoz_spans",
|
||||
"SELECT * FROM distributed_signoz_spans",
|
||||
`table "distributed_signoz_spans" is deprecated`,
|
||||
},
|
||||
// Traces V2 → V3 (local)
|
||||
{
|
||||
"signoz_index_v2",
|
||||
"SELECT * FROM signoz_index_v2",
|
||||
`use "distributed_signoz_index_v3"`,
|
||||
},
|
||||
{
|
||||
"usage_explorer",
|
||||
"SELECT * FROM usage_explorer",
|
||||
`table "usage_explorer" is deprecated`,
|
||||
},
|
||||
{
|
||||
"signoz_spans",
|
||||
"SELECT * FROM signoz_spans LIMIT 10",
|
||||
`table "signoz_spans" is deprecated`,
|
||||
},
|
||||
// Logs V1 → V2
|
||||
{
|
||||
"distributed_logs",
|
||||
"SELECT * FROM signoz_logs.distributed_logs WHERE timestamp > now() - INTERVAL 1 HOUR",
|
||||
`use "distributed_logs_v2"`,
|
||||
},
|
||||
{
|
||||
"logs",
|
||||
"SELECT body FROM logs LIMIT 100",
|
||||
`use "distributed_logs_v2"`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.table, func(t *testing.T) {
|
||||
q := ClickHouseQuery{Name: "A", Query: tt.query}
|
||||
err := q.Validate()
|
||||
if err == nil {
|
||||
t.Fatalf("Validate() expected error for deprecated table %q but got none", tt.table)
|
||||
}
|
||||
_, _, _, _, _, additional := errors.Unwrapb(err)
|
||||
combined := strings.Join(additional, "\n")
|
||||
if !contains(combined, tt.wantErrMsg) {
|
||||
t.Errorf("Validate() additional = %q, want to contain %q", combined, tt.wantErrMsg)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_LocalTableUsageWarning covers every local-table entry,
|
||||
// verifying that Validate() passes but LocalTableUsageWarning() returns the
|
||||
// correct "use distributed table X instead" message.
|
||||
func TestClickHouseQuery_LocalTableUsageWarning(t *testing.T) {
|
||||
tests := []struct {
|
||||
table string
|
||||
query string
|
||||
dist string // expected distributed replacement in warning
|
||||
}{
|
||||
// Traces
|
||||
{"signoz_index_v3", "SELECT * FROM signoz_index_v3", "distributed_signoz_index_v3"},
|
||||
{"tag_attributes_v2", "SELECT * FROM tag_attributes_v2", "distributed_tag_attributes_v2"},
|
||||
// Logs
|
||||
{"logs_v2", "SELECT body FROM logs_v2 LIMIT 50", "distributed_logs_v2"},
|
||||
{"logs_v2_resource", "SELECT * FROM logs_v2_resource", "distributed_logs_v2_resource"},
|
||||
// Metrics
|
||||
{"samples_v4", "SELECT * FROM samples_v4 WHERE unix_milli >= 1000", "distributed_samples_v4"},
|
||||
{"samples_v4_agg_5m", "SELECT * FROM samples_v4_agg_5m", "distributed_samples_v4_agg_5m"},
|
||||
{"time_series_v4", "SELECT * FROM time_series_v4", "distributed_time_series_v4"},
|
||||
{"time_series_v4_6hrs", "SELECT * FROM time_series_v4_6hrs", "distributed_time_series_v4_6hrs"},
|
||||
// Meter
|
||||
{"samples", "SELECT * FROM samples WHERE unix_milli >= 1000", "distributed_samples"},
|
||||
{"samples_agg_1d", "SELECT * FROM samples_agg_1d", "distributed_samples_agg_1d"},
|
||||
// Metadata
|
||||
{"attributes_metadata", "SELECT * FROM attributes_metadata", "distributed_attributes_metadata"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.table, func(t *testing.T) {
|
||||
q := ClickHouseQuery{Name: "A", Query: tt.query}
|
||||
if err := q.Validate(); err != nil {
|
||||
t.Fatalf("Validate() unexpected error for local table %q: %v", tt.table, err)
|
||||
}
|
||||
warning := q.LocalTableUsageWarning()
|
||||
if warning == "" {
|
||||
t.Fatalf("LocalTableUsageWarning() expected warning for local table %q but got none", tt.table)
|
||||
}
|
||||
wantFragment := `use distributed table "` + tt.dist + `"`
|
||||
if !contains(warning, wantFragment) {
|
||||
t.Errorf("LocalTableUsageWarning() = %q, want to contain %q", warning, wantFragment)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_Validate_CaseInsensitive verifies that deprecated table
|
||||
// pattern matching is case-insensitive and returns an error.
|
||||
func TestClickHouseQuery_Validate_CaseInsensitive(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
query string
|
||||
}{
|
||||
{"deprecated table uppercase", "SELECT * FROM DISTRIBUTED_SIGNOZ_INDEX_V2"},
|
||||
{"deprecated table mixed case", "SELECT * FROM Distributed_SignoZ_Index_V2"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := (ClickHouseQuery{Name: "A", Query: tt.query}).Validate()
|
||||
if err == nil {
|
||||
t.Errorf("Validate() expected error for %q but got none", tt.query)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_LocalTableUsageWarning_CaseInsensitive verifies that local
|
||||
// table pattern matching is case-insensitive and returns a warning.
|
||||
func TestClickHouseQuery_LocalTableUsageWarning_CaseInsensitive(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
query string
|
||||
}{
|
||||
{"local table uppercase", "SELECT * FROM SAMPLES_V4"},
|
||||
{"local table mixed case", "SELECT * FROM Time_Series_V4"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
q := ClickHouseQuery{Name: "A", Query: tt.query}
|
||||
if err := q.Validate(); err != nil {
|
||||
t.Errorf("Validate() unexpected error for %q: %v", tt.query, err)
|
||||
}
|
||||
if q.LocalTableUsageWarning() == "" {
|
||||
t.Errorf("LocalTableUsageWarning() expected warning for %q but got none", tt.query)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -626,13 +626,7 @@ func validateQueryEnvelope(envelope QueryEnvelope, opts ...ValidationOption) err
|
||||
"invalid ClickHouse SQL spec",
|
||||
)
|
||||
}
|
||||
if spec.Query == "" {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"ClickHouse SQL query is required",
|
||||
)
|
||||
}
|
||||
return nil
|
||||
return spec.Validate()
|
||||
default:
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
|
||||
@@ -311,7 +311,7 @@ func TestQueryRangeRequest_ValidateAllQueriesNotDisabled(t *testing.T) {
|
||||
Type: QueryTypeClickHouseSQL,
|
||||
Spec: ClickHouseQuery{
|
||||
Name: "CH1",
|
||||
Query: "SELECT count() FROM logs",
|
||||
Query: "SELECT count() FROM distributed_logs_v2",
|
||||
Disabled: true,
|
||||
},
|
||||
},
|
||||
@@ -615,7 +615,7 @@ func TestQueryRangeRequest_ValidateCompositeQuery(t *testing.T) {
|
||||
Type: QueryTypeClickHouseSQL,
|
||||
Spec: ClickHouseQuery{
|
||||
Name: "CH1",
|
||||
Query: "SELECT count() FROM logs",
|
||||
Query: "SELECT count() FROM distributed_logs_v2",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1208,6 +1208,22 @@ func TestRequestType_IsAggregation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClickHouseQuery_Validate(t *testing.T) {
|
||||
t.Run("empty name", func(t *testing.T) {
|
||||
err := (ClickHouseQuery{Name: "", Query: "SELECT 1"}).Validate()
|
||||
if err == nil || !contains(err.Error(), "name is required") {
|
||||
t.Errorf("Validate() expected 'name is required' error, got %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("empty query", func(t *testing.T) {
|
||||
err := (ClickHouseQuery{Name: "A", Query: ""}).Validate()
|
||||
if err == nil || !contains(err.Error(), "ClickHouse SQL query is required") {
|
||||
t.Errorf("Validate() expected 'ClickHouse SQL query is required' error, got %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
// Fields that only apply to aggregation queries (groupBy, having, aggregations)
|
||||
// should be silently skipped for non-aggregation request types.
|
||||
|
||||
@@ -241,10 +241,14 @@ def get_all_series(response_json: Dict, query_name: str) -> List[Dict]:
|
||||
|
||||
|
||||
def get_scalar_value(response_json: Dict, query_name: str) -> Optional[float]:
|
||||
values = get_series_values(response_json, query_name)
|
||||
if values:
|
||||
return values[0].get("value")
|
||||
return None
|
||||
results = response_json.get("data", {}).get("data", {}).get("results", [])
|
||||
result = find_named_result(results, query_name)
|
||||
if not result:
|
||||
return None
|
||||
data = result.get("data", [])
|
||||
if not data or not data[0]:
|
||||
return None
|
||||
return data[0][0]
|
||||
|
||||
|
||||
def compare_values(
|
||||
|
||||
210
tests/integration/src/querier/13_clickhouse_sql.py
Normal file
210
tests/integration/src/querier/13_clickhouse_sql.py
Normal file
@@ -0,0 +1,210 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.logs import Logs
|
||||
from fixtures.querier import get_scalar_value, make_query_request
|
||||
|
||||
|
||||
def test_clickhouse_sql_valid_scalar_query(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Insert a few log lines, then run a ClickHouse SQL query against the
|
||||
distributed table and verify:
|
||||
1. The request is accepted (HTTP 200 / status "success").
|
||||
2. The returned count is at least the number of logs we inserted.
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
num_inserted_logs = 3
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=i),
|
||||
body=f"clickhouse sql test log {i}",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "clickhouse-sql-test",
|
||||
"deployment.environment": "integration",
|
||||
},
|
||||
)
|
||||
for i in range(num_inserted_logs)
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
end = int(now.timestamp() * 1000) + 1_000 # +1 s to include "now"
|
||||
start = end - 60_000
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start,
|
||||
end,
|
||||
[
|
||||
{
|
||||
"type": "clickhouse_sql",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"query": "SELECT count() AS value FROM signoz_logs.distributed_logs_v2",
|
||||
},
|
||||
}
|
||||
],
|
||||
request_type="scalar",
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
body = response.json()
|
||||
assert body["status"] == "success"
|
||||
|
||||
count = get_scalar_value(body, "A")
|
||||
assert count is not None, "Expected a scalar count value in the response"
|
||||
assert (
|
||||
count == num_inserted_logs
|
||||
), f"Expected count == {num_inserted_logs}, got {count}"
|
||||
|
||||
|
||||
def test_clickhouse_sql_valid_raw_query(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Insert a few log lines, then run a ClickHouse SQL query against the
|
||||
distributed table and verify:
|
||||
1. The request is accepted (HTTP 200 / status "success").
|
||||
2. The returned count is at least the number of logs we inserted.
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
num_inserted_logs = 3
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=i),
|
||||
body=f"clickhouse sql test log {i}",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "clickhouse-sql-test",
|
||||
"deployment.environment": "integration",
|
||||
},
|
||||
)
|
||||
for i in range(num_inserted_logs)
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
end = int(now.timestamp() * 1000) + 1_000 # +1 s to include "now"
|
||||
start = end - 60_000
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start,
|
||||
end,
|
||||
[
|
||||
{
|
||||
"type": "clickhouse_sql",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"query": "SELECT * FROM signoz_logs.distributed_logs_v2",
|
||||
},
|
||||
}
|
||||
],
|
||||
request_type="raw",
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
body = response.json()
|
||||
assert body["status"] == "success"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"query, expected_error",
|
||||
[
|
||||
(
|
||||
"SELECT * FROM signoz_logs.distributed_logs LIMIT 10",
|
||||
"deprecated", # This is captured by validate()
|
||||
),
|
||||
(
|
||||
"SELECT * from invalid_table",
|
||||
"Unknown table expression", # This is captured by mapClickHouseError()
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_clickhouse_sql_invalid_queries(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
query: str,
|
||||
expected_error: str,
|
||||
) -> None:
|
||||
"""
|
||||
ClickHouse SQL queries referencing deprecated tables or unknown tables are
|
||||
rejected with HTTP 400.
|
||||
"""
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
end = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
|
||||
start = end - 60_000
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start,
|
||||
end,
|
||||
[{"type": "clickhouse_sql", "spec": {"name": "A", "query": query}}],
|
||||
request_type="raw",
|
||||
)
|
||||
assert response.status_code == HTTPStatus.BAD_REQUEST
|
||||
body = response.json()
|
||||
assert body["status"] == "error"
|
||||
assert expected_error in body["error"]["message"]
|
||||
|
||||
|
||||
def test_clickhouse_sql_local_table_warning(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
"""
|
||||
ClickHouse SQL queries referencing local (non-distributed) tables are
|
||||
executed but the response includes a warning advising use of the
|
||||
distributed table instead.
|
||||
"""
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
end = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
|
||||
start = end - 60_000
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start,
|
||||
end,
|
||||
[
|
||||
{
|
||||
"type": "clickhouse_sql",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"query": "SELECT count() AS value FROM signoz_logs.logs_v2",
|
||||
},
|
||||
}
|
||||
],
|
||||
request_type="scalar",
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
body = response.json()
|
||||
assert body["status"] == "success"
|
||||
|
||||
warning = body["data"].get("warning")
|
||||
assert warning is not None, "Expected a warning in the response for local table usage"
|
||||
warning_messages = " ".join(
|
||||
w["message"] for w in warning.get("warnings", [])
|
||||
)
|
||||
assert "distributed" in warning_messages, (
|
||||
f"Expected warning to mention distributed table, got: {warning_messages}"
|
||||
)
|
||||
Reference in New Issue
Block a user