Compare commits

..

1 Commits

Author SHA1 Message Date
Pandey
807211b8d8 refactor(pprof): extract infrastructure provider (#10673)
Some checks are pending
build-staging / staging (push) Blocked by required conditions
build-staging / prepare (push) Waiting to run
build-staging / js-build (push) Blocked by required conditions
build-staging / go-build (push) Blocked by required conditions
Release Drafter / update_release_draft (push) Waiting to run
* refactor(pprof): extract infrastructure provider

* refactor(pprof): remove redundant exports

* chore: address comments
2026-03-23 09:13:05 +00:00
17 changed files with 219 additions and 502 deletions

View File

@@ -39,6 +39,13 @@ instrumentation:
host: "0.0.0.0"
port: 9090
##################### PProf #####################
pprof:
# Whether to enable the pprof server.
enabled: true
# The address on which the pprof server listens.
address: 0.0.0.0:6060
##################### Web #####################
web:
# Whether to enable the web frontend

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"net"
"net/http"
_ "net/http/pprof" // http profiler
"slices"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
@@ -313,15 +312,6 @@ func (s *Server) Start(ctx context.Context) error {
s.unavailableChannel <- healthcheck.Unavailable
}()
go func() {
slog.Info("Starting pprof server", "addr", baseconst.DebugHttpPort)
err = http.ListenAndServe(baseconst.DebugHttpPort, nil)
if err != nil {
slog.Error("Could not start pprof server", errors.Attr(err))
}
}()
go func() {
slog.Info("Starting OpAmp Websocket server", "addr", baseconst.OpAmpWsEndpoint)
err := s.opampServer.Start(baseconst.OpAmpWsEndpoint)

32
pkg/pprof/config.go Normal file
View File

@@ -0,0 +1,32 @@
package pprof
import "github.com/SigNoz/signoz/pkg/factory"
// Config holds the configuration for the pprof server.
type Config struct {
Enabled bool `mapstructure:"enabled"`
Address string `mapstructure:"address"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("pprof"), newConfig)
}
func newConfig() factory.Config {
return Config{
Enabled: true,
Address: "0.0.0.0:6060",
}
}
func (c Config) Validate() error {
return nil
}
func (c Config) Provider() string {
if c.Enabled {
return "http"
}
return "noop"
}

42
pkg/pprof/config_test.go Normal file
View File

@@ -0,0 +1,42 @@
package pprof
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/config"
"github.com/SigNoz/signoz/pkg/config/envprovider"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewWithEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ_PPROF_ENABLED", "false")
t.Setenv("SIGNOZ_PPROF_ADDRESS", "127.0.0.1:6061")
conf, err := config.New(
context.Background(),
config.ResolverConfig{
Uris: []string{"env:"},
ProviderFactories: []config.ProviderFactory{
envprovider.NewFactory(),
},
},
[]factory.ConfigFactory{
NewConfigFactory(),
},
)
require.NoError(t, err)
actual := Config{}
err = conf.Unmarshal("pprof", &actual)
require.NoError(t, err)
expected := Config{
Enabled: false,
Address: "127.0.0.1:6061",
}
assert.Equal(t, expected, actual)
}

View File

@@ -0,0 +1,59 @@
package httppprof
import (
"context"
"log/slog"
"net/http"
nethttppprof "net/http/pprof"
runtimepprof "runtime/pprof"
"github.com/SigNoz/signoz/pkg/factory"
httpserver "github.com/SigNoz/signoz/pkg/http/server"
"github.com/SigNoz/signoz/pkg/pprof"
)
type provider struct {
server *httpserver.Server
}
func NewFactory() factory.ProviderFactory[pprof.PProf, pprof.Config] {
return factory.NewProviderFactory(factory.MustNewName("http"), New)
}
func New(_ context.Context, settings factory.ProviderSettings, config pprof.Config) (pprof.PProf, error) {
server, err := httpserver.New(
settings.Logger.With(slog.String("pkg", "github.com/SigNoz/signoz/pkg/pprof/httppprof")),
httpserver.Config{Address: config.Address},
newHandler(),
)
if err != nil {
return nil, err
}
return &provider{server: server}, nil
}
func (provider *provider) Start(ctx context.Context) error {
return provider.server.Start(ctx)
}
func (provider *provider) Stop(ctx context.Context) error {
return provider.server.Stop(ctx)
}
func newHandler() http.Handler {
mux := http.NewServeMux()
// Register the endpoints from net/http/pprof.
mux.HandleFunc("/debug/pprof/", nethttppprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", nethttppprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", nethttppprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", nethttppprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", nethttppprof.Trace)
// Register the runtime profiles in the same order returned by runtime/pprof.Profiles().
for _, profile := range runtimepprof.Profiles() {
mux.Handle("/debug/pprof/"+profile.Name(), nethttppprof.Handler(profile.Name()))
}
return mux
}

View File

@@ -0,0 +1,35 @@
package nooppprof
import (
"context"
"sync"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/pprof"
)
type provider struct {
stopC chan struct{}
stopOnce sync.Once
}
func NewFactory() factory.ProviderFactory[pprof.PProf, pprof.Config] {
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
}
func New(_ context.Context, _ factory.ProviderSettings, _ pprof.Config) (pprof.PProf, error) {
return &provider{stopC: make(chan struct{})}, nil
}
func (provider *provider) Start(context.Context) error {
<-provider.stopC
return nil
}
func (provider *provider) Stop(context.Context) error {
provider.stopOnce.Do(func() {
close(provider.stopC)
})
return nil
}

8
pkg/pprof/pprof.go Normal file
View File

@@ -0,0 +1,8 @@
package pprof
import "github.com/SigNoz/signoz/pkg/factory"
// PProf is the interface that wraps the pprof service lifecycle.
type PProf interface {
factory.Service
}

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"net"
"net/http"
_ "net/http/pprof" // http profiler
"slices"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
@@ -294,15 +293,6 @@ func (s *Server) Start(ctx context.Context) error {
s.unavailableChannel <- healthcheck.Unavailable
}()
go func() {
slog.Info("Starting pprof server", "addr", constants.DebugHttpPort)
err = http.ListenAndServe(constants.DebugHttpPort, nil)
if err != nil {
slog.Error("Could not start pprof server", errors.Attr(err))
}
}()
go func() {
slog.Info("Starting OpAmp Websocket server", "addr", constants.OpAmpWsEndpoint)
err := s.opampServer.Start(constants.OpAmpWsEndpoint)

View File

@@ -14,7 +14,6 @@ import (
const (
HTTPHostPort = "0.0.0.0:8080" // Address to serve http (query service)
PrivateHostPort = "0.0.0.0:8085" // Address to server internal services like alert manager
DebugHttpPort = "0.0.0.0:6060" // Address to serve http (pprof)
OpAmpWsEndpoint = "0.0.0.0:4320" // address for opamp websocket
)

View File

@@ -23,6 +23,7 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/pprof"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
@@ -50,6 +51,9 @@ type Config struct {
// Instrumentation config
Instrumentation instrumentation.Config `mapstructure:"instrumentation"`
// PProf config
PProf pprof.Config `mapstructure:"pprof"`
// Analytics config
Analytics analytics.Config `mapstructure:"analytics"`
@@ -122,6 +126,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
global.NewConfigFactory(),
version.NewConfigFactory(),
instrumentation.NewConfigFactory(),
pprof.NewConfigFactory(),
analytics.NewConfigFactory(),
web.NewConfigFactory(),
cache.NewConfigFactory(),

View File

@@ -34,6 +34,9 @@ import (
"github.com/SigNoz/signoz/pkg/modules/session/implsession"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/pprof"
"github.com/SigNoz/signoz/pkg/pprof/httppprof"
"github.com/SigNoz/signoz/pkg/pprof/nooppprof"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
"github.com/SigNoz/signoz/pkg/querier"
@@ -89,6 +92,13 @@ func NewWebProviderFactories() factory.NamedMap[factory.ProviderFactory[web.Web,
)
}
func NewPProfProviderFactories() factory.NamedMap[factory.ProviderFactory[pprof.PProf, pprof.Config]] {
return factory.MustNewNamedMap(
httppprof.NewFactory(),
nooppprof.NewFactory(),
)
}
func NewSQLStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config]] {
return factory.MustNewNamedMap(
sqlitesqlstore.NewFactory(sqlstorehook.NewLoggingFactory(), sqlstorehook.NewInstrumentationFactory()),

View File

@@ -107,6 +107,17 @@ func New(
// Get the provider settings from instrumentation
providerSettings := instrumentation.ToProviderSettings()
pprofService, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.PProf,
NewPProfProviderFactories(),
config.PProf.Provider(),
)
if err != nil {
return nil, err
}
// Initialize analytics just after instrumentation, as providers might require it
analytics, err := factory.NewProviderFromNamedMap(
ctx,
@@ -448,6 +459,7 @@ func New(
registry, err := factory.NewRegistry(
instrumentation.Logger(),
factory.NewNamedService(factory.MustNewName("instrumentation"), instrumentation),
factory.NewNamedService(factory.MustNewName("pprof"), pprofService),
factory.NewNamedService(factory.MustNewName("analytics"), analytics),
factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanager),
factory.NewNamedService(factory.MustNewName("licensing"), licensing),

View File

@@ -1,97 +1,5 @@
package querybuildertypesv5
import (
"fmt"
"regexp"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
)
type chTableCheck struct {
pattern *regexp.Regexp
errMsg string
}
func buildDeprecatedChecks(entries []struct{ name, replacement string }) []chTableCheck {
result := make([]chTableCheck, len(entries))
for i, e := range entries {
var msg string
if e.replacement != "" {
msg = fmt.Sprintf("ClickHouse query references deprecated table %q, use %q instead", e.name, e.replacement)
} else {
msg = fmt.Sprintf("ClickHouse query references deprecated table %q", e.name)
}
result[i] = chTableCheck{
pattern: regexp.MustCompile(`(?i)\b` + regexp.QuoteMeta(e.name) + `\b`),
errMsg: msg,
}
}
return result
}
func buildLocalChecks(entries []struct{ name, replacement string }) []chTableCheck {
result := make([]chTableCheck, len(entries))
for i, e := range entries {
result[i] = chTableCheck{
pattern: regexp.MustCompile(`(?i)\b` + regexp.QuoteMeta(e.name) + `\b`),
errMsg: fmt.Sprintf("ClickHouse query references local table %q, use distributed table %q instead", e.name, e.replacement),
}
}
return result
}
// chTableChecks combines deprecated and local table checks. Both sets reject
// queries that reference disallowed tables; they differ only in error message.
// Word-boundary patterns prevent false positives (e.g. "distributed_logs"
// must not match "distributed_logs_v2").
var chTableChecks = append(
buildDeprecatedChecks([]struct{ name, replacement string }{
// Traces V2 → V3
{"distributed_signoz_index_v2", "distributed_signoz_index_v3"},
{"signoz_index_v2", "distributed_signoz_index_v3"},
{"distributed_signoz_error_index_v2", "distributed_signoz_index_v3"},
{"signoz_error_index_v2", "distributed_signoz_index_v3"},
{"distributed_dependency_graph_minutes_v2", ""},
{"dependency_graph_minutes_v2", ""},
{"distributed_signoz_operations", "distributed_top_level_operations"},
{"signoz_operations", "distributed_top_level_operations"},
{"distributed_durationSort", "distributed_signoz_index_v3"},
{"durationSort", "distributed_signoz_index_v3"},
{"distributed_usage_explorer", ""},
{"usage_explorer", ""},
{"distributed_signoz_spans", ""},
{"signoz_spans", ""},
// Logs V1 → V2
{"distributed_logs", "distributed_logs_v2"},
{"logs", "distributed_logs_v2"},
}),
buildLocalChecks([]struct{ name, replacement string }{
// Traces
{"signoz_index_v3", "distributed_signoz_index_v3"},
{"tag_attributes_v2", "distributed_tag_attributes_v2"},
// Logs
{"logs_v2", "distributed_logs_v2"},
{"logs_v2_resource", "distributed_logs_v2_resource"},
// Metrics
{"samples_v4", "distributed_samples_v4"},
{"samples_v4_agg_5m", "distributed_samples_v4_agg_5m"},
{"samples_v4_agg_30m", "distributed_samples_v4_agg_30m"},
{"exp_hist", "distributed_exp_hist"},
{"time_series_v4", "distributed_time_series_v4"},
{"time_series_v4_6hrs", "distributed_time_series_v4_6hrs"},
{"time_series_v4_1day", "distributed_time_series_v4_1day"},
{"time_series_v4_1week", "distributed_time_series_v4_1week"},
{"updated_metadata", "distributed_updated_metadata"},
{"metadata", "distributed_metadata"},
// Meter
{"samples", "distributed_samples"},
{"samples_agg_1d", "distributed_samples_agg_1d"},
// Metadata
{"attributes_metadata", "distributed_attributes_metadata"},
})...,
)
type ClickHouseQuery struct {
// name of the query
Name string `json:"name"`
@@ -107,30 +15,3 @@ type ClickHouseQuery struct {
func (q ClickHouseQuery) Copy() ClickHouseQuery {
return q
}
// Validate performs basic validation on ClickHouseQuery.
func (q ClickHouseQuery) Validate() error {
if q.Name == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"name is required for ClickHouse query",
)
}
if q.Query == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"ClickHouse SQL query is required",
)
}
trimmed := strings.TrimSpace(q.Query)
for _, check := range chTableChecks {
if check.pattern.MatchString(trimmed) {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "%s", check.errMsg)
}
}
return nil
}

View File

@@ -1,259 +0,0 @@
package querybuildertypesv5
import (
"testing"
)
func TestClickHouseQuery_Copy(t *testing.T) {
q := ClickHouseQuery{Name: "A", Query: "SELECT 1", Disabled: true, Legend: "my legend"}
got := q.Copy()
if got != q {
t.Errorf("Copy() = %+v, want %+v", got, q)
}
}
// TestClickHouseQuery_Validate_DeprecatedTables covers every deprecated table entry,
// verifying both rejection and the correct error message (with or without a replacement hint).
func TestClickHouseQuery_Validate_DeprecatedTables(t *testing.T) {
tests := []struct {
table string
query string
wantErrMsg string // substring expected in error
}{
// Traces V2 → V3 (distributed)
{
"distributed_signoz_index_v2",
"SELECT * FROM distributed_signoz_index_v2 LIMIT 10",
`use "distributed_signoz_index_v3"`,
},
{
"distributed_signoz_error_index_v2",
"SELECT * FROM distributed_signoz_error_index_v2",
`use "distributed_signoz_index_v3"`,
},
{
"distributed_dependency_graph_minutes_v2",
"SELECT * FROM distributed_dependency_graph_minutes_v2",
`deprecated table "distributed_dependency_graph_minutes_v2"`,
},
{
"distributed_signoz_operations",
"SELECT * FROM distributed_signoz_operations",
`use "distributed_top_level_operations"`,
},
{
"distributed_durationSort",
"SELECT * FROM distributed_durationSort",
`use "distributed_signoz_index_v3"`,
},
{
"distributed_usage_explorer",
"SELECT * FROM distributed_usage_explorer",
`deprecated table "distributed_usage_explorer"`,
},
{
"distributed_signoz_spans",
"SELECT * FROM distributed_signoz_spans",
`deprecated table "distributed_signoz_spans"`,
},
// Traces V2 → V3 (local)
{
"signoz_index_v2",
"SELECT * FROM signoz_index_v2",
`use "distributed_signoz_index_v3"`,
},
{
"signoz_error_index_v2",
"SELECT * FROM signoz_error_index_v2",
`use "distributed_signoz_index_v3"`,
},
{
"dependency_graph_minutes_v2",
"SELECT * FROM dependency_graph_minutes_v2",
`deprecated table "dependency_graph_minutes_v2"`,
},
{
"signoz_operations",
"SELECT * FROM signoz_operations",
`use "distributed_top_level_operations"`,
},
{
"durationSort",
"SELECT * FROM durationSort",
`use "distributed_signoz_index_v3"`,
},
{
"usage_explorer",
"SELECT * FROM usage_explorer",
`deprecated table "usage_explorer"`,
},
{
"signoz_spans",
"SELECT * FROM signoz_spans LIMIT 10",
`deprecated table "signoz_spans"`,
},
// Logs V1 → V2
{
"distributed_logs",
"SELECT * FROM signoz_logs.distributed_logs WHERE timestamp > now() - INTERVAL 1 HOUR",
`use "distributed_logs_v2"`,
},
{
"logs",
"SELECT body FROM logs LIMIT 100",
`use "distributed_logs_v2"`,
},
}
for _, tt := range tests {
t.Run(tt.table, func(t *testing.T) {
q := ClickHouseQuery{Name: "A", Query: tt.query}
err := q.Validate()
if err == nil {
t.Fatalf("Validate() expected error for deprecated table %q but got none", tt.table)
}
if !contains(err.Error(), tt.wantErrMsg) {
t.Errorf("Validate() error = %q, want to contain %q", err.Error(), tt.wantErrMsg)
}
})
}
}
// TestClickHouseQuery_Validate_LocalTables covers every local-table entry,
// verifying rejection and the correct "use distributed table X instead" message.
func TestClickHouseQuery_Validate_LocalTables(t *testing.T) {
tests := []struct {
table string
query string
dist string // expected distributed replacement in error
}{
// Traces
{"signoz_index_v3", "SELECT * FROM signoz_index_v3", "distributed_signoz_index_v3"},
{"tag_attributes_v2", "SELECT * FROM tag_attributes_v2", "distributed_tag_attributes_v2"},
// Logs
{"logs_v2", "SELECT body FROM logs_v2 LIMIT 50", "distributed_logs_v2"},
{"logs_v2_resource", "SELECT * FROM logs_v2_resource", "distributed_logs_v2_resource"},
// Metrics
{"samples_v4", "SELECT * FROM samples_v4 WHERE unix_milli >= 1000", "distributed_samples_v4"},
{"samples_v4_agg_5m", "SELECT * FROM samples_v4_agg_5m", "distributed_samples_v4_agg_5m"},
{"samples_v4_agg_30m", "SELECT * FROM samples_v4_agg_30m", "distributed_samples_v4_agg_30m"},
{"exp_hist", "SELECT * FROM exp_hist", "distributed_exp_hist"},
{"time_series_v4", "SELECT * FROM time_series_v4", "distributed_time_series_v4"},
{"time_series_v4_6hrs", "SELECT * FROM time_series_v4_6hrs", "distributed_time_series_v4_6hrs"},
{"time_series_v4_1day", "SELECT * FROM time_series_v4_1day", "distributed_time_series_v4_1day"},
{"time_series_v4_1week", "SELECT * FROM time_series_v4_1week", "distributed_time_series_v4_1week"},
{"updated_metadata", "SELECT * FROM updated_metadata", "distributed_updated_metadata"},
{"metadata", "SELECT * FROM metadata", "distributed_metadata"},
// Meter
{"samples", "SELECT * FROM samples WHERE unix_milli >= 1000", "distributed_samples"},
{"samples_agg_1d", "SELECT * FROM samples_agg_1d", "distributed_samples_agg_1d"},
// Metadata
{"attributes_metadata", "SELECT * FROM attributes_metadata", "distributed_attributes_metadata"},
}
for _, tt := range tests {
t.Run(tt.table, func(t *testing.T) {
q := ClickHouseQuery{Name: "A", Query: tt.query}
err := q.Validate()
if err == nil {
t.Fatalf("Validate() expected error for local table %q but got none", tt.table)
}
wantFragment := `use distributed table "` + tt.dist + `"`
if !contains(err.Error(), wantFragment) {
t.Errorf("Validate() error = %q, want to contain %q", err.Error(), wantFragment)
}
})
}
}
// TestClickHouseQuery_Validate_AllowedTables verifies that current-generation
// distributed tables are accepted without error.
func TestClickHouseQuery_Validate_AllowedTables(t *testing.T) {
queries := []string{
"SELECT * FROM distributed_signoz_index_v3",
"SELECT * FROM distributed_tag_attributes_v2",
"SELECT * FROM distributed_top_level_operations",
"SELECT * FROM distributed_logs_v2",
"SELECT * FROM distributed_logs_v2_resource",
"SELECT * FROM distributed_samples_v4",
"SELECT * FROM distributed_samples_v4_agg_5m",
"SELECT * FROM distributed_samples_v4_agg_30m",
"SELECT * FROM distributed_exp_hist",
"SELECT * FROM distributed_time_series_v4",
"SELECT * FROM distributed_time_series_v4_6hrs",
"SELECT * FROM distributed_time_series_v4_1day",
"SELECT * FROM distributed_time_series_v4_1week",
"SELECT * FROM distributed_updated_metadata",
"SELECT * FROM distributed_metadata",
"SELECT * FROM distributed_samples",
"SELECT * FROM distributed_samples_agg_1d",
"SELECT * FROM distributed_attributes_metadata",
}
for _, q := range queries {
t.Run(q, func(t *testing.T) {
err := (ClickHouseQuery{Name: "A", Query: q}).Validate()
if err != nil {
t.Errorf("Validate() unexpected error = %v", err)
}
})
}
}
// TestClickHouseQuery_Validate_WordBoundary ensures that table name patterns
// do not produce false positives when a deprecated/local name is a prefix of
// a longer allowed name.
func TestClickHouseQuery_Validate_WordBoundary(t *testing.T) {
tests := []struct {
name string
query string
wantErr bool
}{
// "logs" must not match "distributed_logs_v2" or "logs_v2"
{"distributed_logs_v2 not flagged by 'logs' pattern", "SELECT * FROM distributed_logs_v2", false},
// "distributed_logs" must not match "distributed_logs_v2"
{"distributed_logs_v2 not flagged by 'distributed_logs' pattern", "SELECT * FROM signoz_logs.distributed_logs_v2", false},
// "metadata" must not match "attributes_metadata" or "distributed_attributes_metadata"
{"distributed_attributes_metadata not flagged by 'metadata' pattern", "SELECT * FROM distributed_attributes_metadata", false},
{"attributes_metadata flagged by its own pattern", "SELECT * FROM attributes_metadata", true},
// "samples" must not match "samples_v4" (caught by samples_v4 entry instead)
{"distributed_samples_v4 not flagged by 'samples' pattern", "SELECT * FROM distributed_samples_v4", false},
// "time_series_v4" must not match "time_series_v4_6hrs"
{"distributed_time_series_v4_6hrs not flagged by 'time_series_v4' pattern", "SELECT * FROM distributed_time_series_v4_6hrs", false},
// "signoz_index_v2" must not match "distributed_signoz_index_v2" (separate entry)
{"distributed_signoz_index_v2 flagged by its own pattern", "SELECT * FROM distributed_signoz_index_v2", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := (ClickHouseQuery{Name: "A", Query: tt.query}).Validate()
if tt.wantErr && err == nil {
t.Errorf("Validate() expected error but got none")
} else if !tt.wantErr && err != nil {
t.Errorf("Validate() unexpected error = %v", err)
}
})
}
}
// TestClickHouseQuery_Validate_CaseInsensitive verifies that table pattern
// matching is case-insensitive.
func TestClickHouseQuery_Validate_CaseInsensitive(t *testing.T) {
tests := []struct {
name string
query string
}{
{"deprecated table uppercase", "SELECT * FROM DISTRIBUTED_SIGNOZ_INDEX_V2"},
{"deprecated table mixed case", "SELECT * FROM Distributed_SignoZ_Index_V2"},
{"local table uppercase", "SELECT * FROM SAMPLES_V4"},
{"local table mixed case", "SELECT * FROM Time_Series_V4"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := (ClickHouseQuery{Name: "A", Query: tt.query}).Validate()
if err == nil {
t.Errorf("Validate() expected error for %q but got none", tt.query)
}
})
}
}

View File

@@ -608,7 +608,13 @@ func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) erro
"invalid ClickHouse SQL spec",
)
}
return spec.Validate()
if spec.Query == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"ClickHouse SQL query is required",
)
}
return nil
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,

View File

@@ -279,7 +279,7 @@ func TestQueryRangeRequest_ValidateAllQueriesNotDisabled(t *testing.T) {
Type: QueryTypeClickHouseSQL,
Spec: ClickHouseQuery{
Name: "CH1",
Query: "SELECT count() FROM distributed_logs_v2",
Query: "SELECT count() FROM logs",
Disabled: true,
},
},
@@ -559,7 +559,7 @@ func TestQueryRangeRequest_ValidateCompositeQuery(t *testing.T) {
Type: QueryTypeClickHouseSQL,
Spec: ClickHouseQuery{
Name: "CH1",
Query: "SELECT count() FROM distributed_logs_v2",
Query: "SELECT count() FROM logs",
},
},
},
@@ -1150,22 +1150,6 @@ func TestRequestType_IsAggregation(t *testing.T) {
}
}
func TestClickHouseQuery_Validate(t *testing.T) {
t.Run("empty name", func(t *testing.T) {
err := (ClickHouseQuery{Name: "", Query: "SELECT 1"}).Validate()
if err == nil || !contains(err.Error(), "name is required") {
t.Errorf("Validate() expected 'name is required' error, got %v", err)
}
})
t.Run("empty query", func(t *testing.T) {
err := (ClickHouseQuery{Name: "A", Query: ""}).Validate()
if err == nil || !contains(err.Error(), "ClickHouse SQL query is required") {
t.Errorf("Validate() expected 'ClickHouse SQL query is required' error, got %v", err)
}
})
}
func TestNonAggregationFieldsSkipped(t *testing.T) {
// Fields that only apply to aggregation queries (groupBy, having, aggregations)
// should be silently skipped for non-aggregation request types.

View File

@@ -1,84 +0,0 @@
from datetime import datetime, timezone
from http import HTTPStatus
from typing import Callable
import pytest
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.querier import make_query_request
def test_clickhouse_sql_valid_query(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""
A valid ClickHouse SQL query referencing a distributed table is accepted.
No data insertion required — we only verify the request is not rejected.
"""
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
end = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
start = end - 60_000
response = make_query_request(
signoz,
token,
start,
end,
[
{
"type": "clickhouse_sql",
"spec": {
"name": "A",
"query": "SELECT count() AS value FROM signoz_logs.distributed_logs_v2",
},
}
],
request_type="scalar",
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
@pytest.mark.parametrize(
"query, expected_error",
[
(
"SELECT * FROM signoz_logs.distributed_logs LIMIT 10",
"deprecated table",
),
(
"SELECT count() AS value FROM signoz_logs.logs_v2",
"local table",
),
],
)
def test_clickhouse_sql_rejected_tables(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
query: str,
expected_error: str,
) -> None:
"""
ClickHouse SQL queries referencing deprecated or local (non-distributed)
tables are rejected with HTTP 400.
"""
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
end = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
start = end - 60_000
response = make_query_request(
signoz,
token,
start,
end,
[{"type": "clickhouse_sql", "spec": {"name": "A", "query": query}}],
request_type="raw",
)
assert response.status_code == HTTPStatus.BAD_REQUEST
body = response.json()
assert body["status"] == "error"
assert expected_error in body["error"]["message"]