mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-23 12:50:29 +00:00
Compare commits
1 Commits
tvats-vali
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
807211b8d8 |
@@ -39,6 +39,13 @@ instrumentation:
|
||||
host: "0.0.0.0"
|
||||
port: 9090
|
||||
|
||||
##################### PProf #####################
|
||||
pprof:
|
||||
# Whether to enable the pprof server.
|
||||
enabled: true
|
||||
# The address on which the pprof server listens.
|
||||
address: 0.0.0.0:6060
|
||||
|
||||
##################### Web #####################
|
||||
web:
|
||||
# Whether to enable the web frontend
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof" // http profiler
|
||||
"slices"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
|
||||
@@ -313,15 +312,6 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
s.unavailableChannel <- healthcheck.Unavailable
|
||||
}()
|
||||
|
||||
go func() {
|
||||
slog.Info("Starting pprof server", "addr", baseconst.DebugHttpPort)
|
||||
|
||||
err = http.ListenAndServe(baseconst.DebugHttpPort, nil)
|
||||
if err != nil {
|
||||
slog.Error("Could not start pprof server", errors.Attr(err))
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
slog.Info("Starting OpAmp Websocket server", "addr", baseconst.OpAmpWsEndpoint)
|
||||
err := s.opampServer.Start(baseconst.OpAmpWsEndpoint)
|
||||
|
||||
32
pkg/pprof/config.go
Normal file
32
pkg/pprof/config.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package pprof
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/factory"
|
||||
|
||||
// Config holds the configuration for the pprof server.
|
||||
type Config struct {
|
||||
Enabled bool `mapstructure:"enabled"`
|
||||
Address string `mapstructure:"address"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("pprof"), newConfig)
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Enabled: true,
|
||||
Address: "0.0.0.0:6060",
|
||||
}
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c Config) Provider() string {
|
||||
if c.Enabled {
|
||||
return "http"
|
||||
}
|
||||
|
||||
return "noop"
|
||||
}
|
||||
42
pkg/pprof/config_test.go
Normal file
42
pkg/pprof/config_test.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package pprof
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/config"
|
||||
"github.com/SigNoz/signoz/pkg/config/envprovider"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewWithEnvProvider(t *testing.T) {
|
||||
t.Setenv("SIGNOZ_PPROF_ENABLED", "false")
|
||||
t.Setenv("SIGNOZ_PPROF_ADDRESS", "127.0.0.1:6061")
|
||||
|
||||
conf, err := config.New(
|
||||
context.Background(),
|
||||
config.ResolverConfig{
|
||||
Uris: []string{"env:"},
|
||||
ProviderFactories: []config.ProviderFactory{
|
||||
envprovider.NewFactory(),
|
||||
},
|
||||
},
|
||||
[]factory.ConfigFactory{
|
||||
NewConfigFactory(),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual := Config{}
|
||||
err = conf.Unmarshal("pprof", &actual)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := Config{
|
||||
Enabled: false,
|
||||
Address: "127.0.0.1:6061",
|
||||
}
|
||||
|
||||
assert.Equal(t, expected, actual)
|
||||
}
|
||||
59
pkg/pprof/httppprof/provider.go
Normal file
59
pkg/pprof/httppprof/provider.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package httppprof
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
nethttppprof "net/http/pprof"
|
||||
runtimepprof "runtime/pprof"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
httpserver "github.com/SigNoz/signoz/pkg/http/server"
|
||||
"github.com/SigNoz/signoz/pkg/pprof"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
server *httpserver.Server
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[pprof.PProf, pprof.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("http"), New)
|
||||
}
|
||||
|
||||
func New(_ context.Context, settings factory.ProviderSettings, config pprof.Config) (pprof.PProf, error) {
|
||||
server, err := httpserver.New(
|
||||
settings.Logger.With(slog.String("pkg", "github.com/SigNoz/signoz/pkg/pprof/httppprof")),
|
||||
httpserver.Config{Address: config.Address},
|
||||
newHandler(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provider{server: server}, nil
|
||||
}
|
||||
|
||||
func (provider *provider) Start(ctx context.Context) error {
|
||||
return provider.server.Start(ctx)
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(ctx context.Context) error {
|
||||
return provider.server.Stop(ctx)
|
||||
}
|
||||
|
||||
func newHandler() http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
// Register the endpoints from net/http/pprof.
|
||||
mux.HandleFunc("/debug/pprof/", nethttppprof.Index)
|
||||
mux.HandleFunc("/debug/pprof/cmdline", nethttppprof.Cmdline)
|
||||
mux.HandleFunc("/debug/pprof/profile", nethttppprof.Profile)
|
||||
mux.HandleFunc("/debug/pprof/symbol", nethttppprof.Symbol)
|
||||
mux.HandleFunc("/debug/pprof/trace", nethttppprof.Trace)
|
||||
|
||||
// Register the runtime profiles in the same order returned by runtime/pprof.Profiles().
|
||||
for _, profile := range runtimepprof.Profiles() {
|
||||
mux.Handle("/debug/pprof/"+profile.Name(), nethttppprof.Handler(profile.Name()))
|
||||
}
|
||||
|
||||
return mux
|
||||
}
|
||||
35
pkg/pprof/nooppprof/provider.go
Normal file
35
pkg/pprof/nooppprof/provider.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package nooppprof
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/pprof"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
stopC chan struct{}
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[pprof.PProf, pprof.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
|
||||
}
|
||||
|
||||
func New(_ context.Context, _ factory.ProviderSettings, _ pprof.Config) (pprof.PProf, error) {
|
||||
return &provider{stopC: make(chan struct{})}, nil
|
||||
}
|
||||
|
||||
func (provider *provider) Start(context.Context) error {
|
||||
<-provider.stopC
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(context.Context) error {
|
||||
provider.stopOnce.Do(func() {
|
||||
close(provider.stopC)
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
8
pkg/pprof/pprof.go
Normal file
8
pkg/pprof/pprof.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package pprof
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/factory"
|
||||
|
||||
// PProf is the interface that wraps the pprof service lifecycle.
|
||||
type PProf interface {
|
||||
factory.Service
|
||||
}
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof" // http profiler
|
||||
"slices"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||
@@ -294,15 +293,6 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
s.unavailableChannel <- healthcheck.Unavailable
|
||||
}()
|
||||
|
||||
go func() {
|
||||
slog.Info("Starting pprof server", "addr", constants.DebugHttpPort)
|
||||
|
||||
err = http.ListenAndServe(constants.DebugHttpPort, nil)
|
||||
if err != nil {
|
||||
slog.Error("Could not start pprof server", errors.Attr(err))
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
slog.Info("Starting OpAmp Websocket server", "addr", constants.OpAmpWsEndpoint)
|
||||
err := s.opampServer.Start(constants.OpAmpWsEndpoint)
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
const (
|
||||
HTTPHostPort = "0.0.0.0:8080" // Address to serve http (query service)
|
||||
PrivateHostPort = "0.0.0.0:8085" // Address to server internal services like alert manager
|
||||
DebugHttpPort = "0.0.0.0:6060" // Address to serve http (pprof)
|
||||
OpAmpWsEndpoint = "0.0.0.0:4320" // address for opamp websocket
|
||||
)
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/pprof"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
@@ -50,6 +51,9 @@ type Config struct {
|
||||
// Instrumentation config
|
||||
Instrumentation instrumentation.Config `mapstructure:"instrumentation"`
|
||||
|
||||
// PProf config
|
||||
PProf pprof.Config `mapstructure:"pprof"`
|
||||
|
||||
// Analytics config
|
||||
Analytics analytics.Config `mapstructure:"analytics"`
|
||||
|
||||
@@ -122,6 +126,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
|
||||
global.NewConfigFactory(),
|
||||
version.NewConfigFactory(),
|
||||
instrumentation.NewConfigFactory(),
|
||||
pprof.NewConfigFactory(),
|
||||
analytics.NewConfigFactory(),
|
||||
web.NewConfigFactory(),
|
||||
cache.NewConfigFactory(),
|
||||
|
||||
@@ -34,6 +34,9 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/session/implsession"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/pprof"
|
||||
"github.com/SigNoz/signoz/pkg/pprof/httppprof"
|
||||
"github.com/SigNoz/signoz/pkg/pprof/nooppprof"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
@@ -89,6 +92,13 @@ func NewWebProviderFactories() factory.NamedMap[factory.ProviderFactory[web.Web,
|
||||
)
|
||||
}
|
||||
|
||||
func NewPProfProviderFactories() factory.NamedMap[factory.ProviderFactory[pprof.PProf, pprof.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
httppprof.NewFactory(),
|
||||
nooppprof.NewFactory(),
|
||||
)
|
||||
}
|
||||
|
||||
func NewSQLStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
sqlitesqlstore.NewFactory(sqlstorehook.NewLoggingFactory(), sqlstorehook.NewInstrumentationFactory()),
|
||||
|
||||
@@ -107,6 +107,17 @@ func New(
|
||||
// Get the provider settings from instrumentation
|
||||
providerSettings := instrumentation.ToProviderSettings()
|
||||
|
||||
pprofService, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
config.PProf,
|
||||
NewPProfProviderFactories(),
|
||||
config.PProf.Provider(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize analytics just after instrumentation, as providers might require it
|
||||
analytics, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
@@ -448,6 +459,7 @@ func New(
|
||||
registry, err := factory.NewRegistry(
|
||||
instrumentation.Logger(),
|
||||
factory.NewNamedService(factory.MustNewName("instrumentation"), instrumentation),
|
||||
factory.NewNamedService(factory.MustNewName("pprof"), pprofService),
|
||||
factory.NewNamedService(factory.MustNewName("analytics"), analytics),
|
||||
factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanager),
|
||||
factory.NewNamedService(factory.MustNewName("licensing"), licensing),
|
||||
|
||||
@@ -1,97 +1,5 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
type chTableCheck struct {
|
||||
pattern *regexp.Regexp
|
||||
errMsg string
|
||||
}
|
||||
|
||||
func buildDeprecatedChecks(entries []struct{ name, replacement string }) []chTableCheck {
|
||||
result := make([]chTableCheck, len(entries))
|
||||
for i, e := range entries {
|
||||
var msg string
|
||||
if e.replacement != "" {
|
||||
msg = fmt.Sprintf("ClickHouse query references deprecated table %q, use %q instead", e.name, e.replacement)
|
||||
} else {
|
||||
msg = fmt.Sprintf("ClickHouse query references deprecated table %q", e.name)
|
||||
}
|
||||
result[i] = chTableCheck{
|
||||
pattern: regexp.MustCompile(`(?i)\b` + regexp.QuoteMeta(e.name) + `\b`),
|
||||
errMsg: msg,
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func buildLocalChecks(entries []struct{ name, replacement string }) []chTableCheck {
|
||||
result := make([]chTableCheck, len(entries))
|
||||
for i, e := range entries {
|
||||
result[i] = chTableCheck{
|
||||
pattern: regexp.MustCompile(`(?i)\b` + regexp.QuoteMeta(e.name) + `\b`),
|
||||
errMsg: fmt.Sprintf("ClickHouse query references local table %q, use distributed table %q instead", e.name, e.replacement),
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// chTableChecks combines deprecated and local table checks. Both sets reject
|
||||
// queries that reference disallowed tables; they differ only in error message.
|
||||
// Word-boundary patterns prevent false positives (e.g. "distributed_logs"
|
||||
// must not match "distributed_logs_v2").
|
||||
var chTableChecks = append(
|
||||
buildDeprecatedChecks([]struct{ name, replacement string }{
|
||||
// Traces V2 → V3
|
||||
{"distributed_signoz_index_v2", "distributed_signoz_index_v3"},
|
||||
{"signoz_index_v2", "distributed_signoz_index_v3"},
|
||||
{"distributed_signoz_error_index_v2", "distributed_signoz_index_v3"},
|
||||
{"signoz_error_index_v2", "distributed_signoz_index_v3"},
|
||||
{"distributed_dependency_graph_minutes_v2", ""},
|
||||
{"dependency_graph_minutes_v2", ""},
|
||||
{"distributed_signoz_operations", "distributed_top_level_operations"},
|
||||
{"signoz_operations", "distributed_top_level_operations"},
|
||||
{"distributed_durationSort", "distributed_signoz_index_v3"},
|
||||
{"durationSort", "distributed_signoz_index_v3"},
|
||||
{"distributed_usage_explorer", ""},
|
||||
{"usage_explorer", ""},
|
||||
{"distributed_signoz_spans", ""},
|
||||
{"signoz_spans", ""},
|
||||
// Logs V1 → V2
|
||||
{"distributed_logs", "distributed_logs_v2"},
|
||||
{"logs", "distributed_logs_v2"},
|
||||
}),
|
||||
buildLocalChecks([]struct{ name, replacement string }{
|
||||
// Traces
|
||||
{"signoz_index_v3", "distributed_signoz_index_v3"},
|
||||
{"tag_attributes_v2", "distributed_tag_attributes_v2"},
|
||||
// Logs
|
||||
{"logs_v2", "distributed_logs_v2"},
|
||||
{"logs_v2_resource", "distributed_logs_v2_resource"},
|
||||
// Metrics
|
||||
{"samples_v4", "distributed_samples_v4"},
|
||||
{"samples_v4_agg_5m", "distributed_samples_v4_agg_5m"},
|
||||
{"samples_v4_agg_30m", "distributed_samples_v4_agg_30m"},
|
||||
{"exp_hist", "distributed_exp_hist"},
|
||||
{"time_series_v4", "distributed_time_series_v4"},
|
||||
{"time_series_v4_6hrs", "distributed_time_series_v4_6hrs"},
|
||||
{"time_series_v4_1day", "distributed_time_series_v4_1day"},
|
||||
{"time_series_v4_1week", "distributed_time_series_v4_1week"},
|
||||
{"updated_metadata", "distributed_updated_metadata"},
|
||||
{"metadata", "distributed_metadata"},
|
||||
// Meter
|
||||
{"samples", "distributed_samples"},
|
||||
{"samples_agg_1d", "distributed_samples_agg_1d"},
|
||||
// Metadata
|
||||
{"attributes_metadata", "distributed_attributes_metadata"},
|
||||
})...,
|
||||
)
|
||||
|
||||
type ClickHouseQuery struct {
|
||||
// name of the query
|
||||
Name string `json:"name"`
|
||||
@@ -107,30 +15,3 @@ type ClickHouseQuery struct {
|
||||
func (q ClickHouseQuery) Copy() ClickHouseQuery {
|
||||
return q
|
||||
}
|
||||
|
||||
// Validate performs basic validation on ClickHouseQuery.
|
||||
func (q ClickHouseQuery) Validate() error {
|
||||
if q.Name == "" {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"name is required for ClickHouse query",
|
||||
)
|
||||
}
|
||||
|
||||
if q.Query == "" {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"ClickHouse SQL query is required",
|
||||
)
|
||||
}
|
||||
|
||||
trimmed := strings.TrimSpace(q.Query)
|
||||
|
||||
for _, check := range chTableChecks {
|
||||
if check.pattern.MatchString(trimmed) {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "%s", check.errMsg)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,259 +0,0 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestClickHouseQuery_Copy(t *testing.T) {
|
||||
q := ClickHouseQuery{Name: "A", Query: "SELECT 1", Disabled: true, Legend: "my legend"}
|
||||
got := q.Copy()
|
||||
if got != q {
|
||||
t.Errorf("Copy() = %+v, want %+v", got, q)
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_Validate_DeprecatedTables covers every deprecated table entry,
|
||||
// verifying both rejection and the correct error message (with or without a replacement hint).
|
||||
func TestClickHouseQuery_Validate_DeprecatedTables(t *testing.T) {
|
||||
tests := []struct {
|
||||
table string
|
||||
query string
|
||||
wantErrMsg string // substring expected in error
|
||||
}{
|
||||
// Traces V2 → V3 (distributed)
|
||||
{
|
||||
"distributed_signoz_index_v2",
|
||||
"SELECT * FROM distributed_signoz_index_v2 LIMIT 10",
|
||||
`use "distributed_signoz_index_v3"`,
|
||||
},
|
||||
{
|
||||
"distributed_signoz_error_index_v2",
|
||||
"SELECT * FROM distributed_signoz_error_index_v2",
|
||||
`use "distributed_signoz_index_v3"`,
|
||||
},
|
||||
{
|
||||
"distributed_dependency_graph_minutes_v2",
|
||||
"SELECT * FROM distributed_dependency_graph_minutes_v2",
|
||||
`deprecated table "distributed_dependency_graph_minutes_v2"`,
|
||||
},
|
||||
{
|
||||
"distributed_signoz_operations",
|
||||
"SELECT * FROM distributed_signoz_operations",
|
||||
`use "distributed_top_level_operations"`,
|
||||
},
|
||||
{
|
||||
"distributed_durationSort",
|
||||
"SELECT * FROM distributed_durationSort",
|
||||
`use "distributed_signoz_index_v3"`,
|
||||
},
|
||||
{
|
||||
"distributed_usage_explorer",
|
||||
"SELECT * FROM distributed_usage_explorer",
|
||||
`deprecated table "distributed_usage_explorer"`,
|
||||
},
|
||||
{
|
||||
"distributed_signoz_spans",
|
||||
"SELECT * FROM distributed_signoz_spans",
|
||||
`deprecated table "distributed_signoz_spans"`,
|
||||
},
|
||||
// Traces V2 → V3 (local)
|
||||
{
|
||||
"signoz_index_v2",
|
||||
"SELECT * FROM signoz_index_v2",
|
||||
`use "distributed_signoz_index_v3"`,
|
||||
},
|
||||
{
|
||||
"signoz_error_index_v2",
|
||||
"SELECT * FROM signoz_error_index_v2",
|
||||
`use "distributed_signoz_index_v3"`,
|
||||
},
|
||||
{
|
||||
"dependency_graph_minutes_v2",
|
||||
"SELECT * FROM dependency_graph_minutes_v2",
|
||||
`deprecated table "dependency_graph_minutes_v2"`,
|
||||
},
|
||||
{
|
||||
"signoz_operations",
|
||||
"SELECT * FROM signoz_operations",
|
||||
`use "distributed_top_level_operations"`,
|
||||
},
|
||||
{
|
||||
"durationSort",
|
||||
"SELECT * FROM durationSort",
|
||||
`use "distributed_signoz_index_v3"`,
|
||||
},
|
||||
{
|
||||
"usage_explorer",
|
||||
"SELECT * FROM usage_explorer",
|
||||
`deprecated table "usage_explorer"`,
|
||||
},
|
||||
{
|
||||
"signoz_spans",
|
||||
"SELECT * FROM signoz_spans LIMIT 10",
|
||||
`deprecated table "signoz_spans"`,
|
||||
},
|
||||
// Logs V1 → V2
|
||||
{
|
||||
"distributed_logs",
|
||||
"SELECT * FROM signoz_logs.distributed_logs WHERE timestamp > now() - INTERVAL 1 HOUR",
|
||||
`use "distributed_logs_v2"`,
|
||||
},
|
||||
{
|
||||
"logs",
|
||||
"SELECT body FROM logs LIMIT 100",
|
||||
`use "distributed_logs_v2"`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.table, func(t *testing.T) {
|
||||
q := ClickHouseQuery{Name: "A", Query: tt.query}
|
||||
err := q.Validate()
|
||||
if err == nil {
|
||||
t.Fatalf("Validate() expected error for deprecated table %q but got none", tt.table)
|
||||
}
|
||||
if !contains(err.Error(), tt.wantErrMsg) {
|
||||
t.Errorf("Validate() error = %q, want to contain %q", err.Error(), tt.wantErrMsg)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_Validate_LocalTables covers every local-table entry,
|
||||
// verifying rejection and the correct "use distributed table X instead" message.
|
||||
func TestClickHouseQuery_Validate_LocalTables(t *testing.T) {
|
||||
tests := []struct {
|
||||
table string
|
||||
query string
|
||||
dist string // expected distributed replacement in error
|
||||
}{
|
||||
// Traces
|
||||
{"signoz_index_v3", "SELECT * FROM signoz_index_v3", "distributed_signoz_index_v3"},
|
||||
{"tag_attributes_v2", "SELECT * FROM tag_attributes_v2", "distributed_tag_attributes_v2"},
|
||||
// Logs
|
||||
{"logs_v2", "SELECT body FROM logs_v2 LIMIT 50", "distributed_logs_v2"},
|
||||
{"logs_v2_resource", "SELECT * FROM logs_v2_resource", "distributed_logs_v2_resource"},
|
||||
// Metrics
|
||||
{"samples_v4", "SELECT * FROM samples_v4 WHERE unix_milli >= 1000", "distributed_samples_v4"},
|
||||
{"samples_v4_agg_5m", "SELECT * FROM samples_v4_agg_5m", "distributed_samples_v4_agg_5m"},
|
||||
{"samples_v4_agg_30m", "SELECT * FROM samples_v4_agg_30m", "distributed_samples_v4_agg_30m"},
|
||||
{"exp_hist", "SELECT * FROM exp_hist", "distributed_exp_hist"},
|
||||
{"time_series_v4", "SELECT * FROM time_series_v4", "distributed_time_series_v4"},
|
||||
{"time_series_v4_6hrs", "SELECT * FROM time_series_v4_6hrs", "distributed_time_series_v4_6hrs"},
|
||||
{"time_series_v4_1day", "SELECT * FROM time_series_v4_1day", "distributed_time_series_v4_1day"},
|
||||
{"time_series_v4_1week", "SELECT * FROM time_series_v4_1week", "distributed_time_series_v4_1week"},
|
||||
{"updated_metadata", "SELECT * FROM updated_metadata", "distributed_updated_metadata"},
|
||||
{"metadata", "SELECT * FROM metadata", "distributed_metadata"},
|
||||
// Meter
|
||||
{"samples", "SELECT * FROM samples WHERE unix_milli >= 1000", "distributed_samples"},
|
||||
{"samples_agg_1d", "SELECT * FROM samples_agg_1d", "distributed_samples_agg_1d"},
|
||||
// Metadata
|
||||
{"attributes_metadata", "SELECT * FROM attributes_metadata", "distributed_attributes_metadata"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.table, func(t *testing.T) {
|
||||
q := ClickHouseQuery{Name: "A", Query: tt.query}
|
||||
err := q.Validate()
|
||||
if err == nil {
|
||||
t.Fatalf("Validate() expected error for local table %q but got none", tt.table)
|
||||
}
|
||||
wantFragment := `use distributed table "` + tt.dist + `"`
|
||||
if !contains(err.Error(), wantFragment) {
|
||||
t.Errorf("Validate() error = %q, want to contain %q", err.Error(), wantFragment)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_Validate_AllowedTables verifies that current-generation
|
||||
// distributed tables are accepted without error.
|
||||
func TestClickHouseQuery_Validate_AllowedTables(t *testing.T) {
|
||||
queries := []string{
|
||||
"SELECT * FROM distributed_signoz_index_v3",
|
||||
"SELECT * FROM distributed_tag_attributes_v2",
|
||||
"SELECT * FROM distributed_top_level_operations",
|
||||
"SELECT * FROM distributed_logs_v2",
|
||||
"SELECT * FROM distributed_logs_v2_resource",
|
||||
"SELECT * FROM distributed_samples_v4",
|
||||
"SELECT * FROM distributed_samples_v4_agg_5m",
|
||||
"SELECT * FROM distributed_samples_v4_agg_30m",
|
||||
"SELECT * FROM distributed_exp_hist",
|
||||
"SELECT * FROM distributed_time_series_v4",
|
||||
"SELECT * FROM distributed_time_series_v4_6hrs",
|
||||
"SELECT * FROM distributed_time_series_v4_1day",
|
||||
"SELECT * FROM distributed_time_series_v4_1week",
|
||||
"SELECT * FROM distributed_updated_metadata",
|
||||
"SELECT * FROM distributed_metadata",
|
||||
"SELECT * FROM distributed_samples",
|
||||
"SELECT * FROM distributed_samples_agg_1d",
|
||||
"SELECT * FROM distributed_attributes_metadata",
|
||||
}
|
||||
|
||||
for _, q := range queries {
|
||||
t.Run(q, func(t *testing.T) {
|
||||
err := (ClickHouseQuery{Name: "A", Query: q}).Validate()
|
||||
if err != nil {
|
||||
t.Errorf("Validate() unexpected error = %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_Validate_WordBoundary ensures that table name patterns
|
||||
// do not produce false positives when a deprecated/local name is a prefix of
|
||||
// a longer allowed name.
|
||||
func TestClickHouseQuery_Validate_WordBoundary(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
query string
|
||||
wantErr bool
|
||||
}{
|
||||
// "logs" must not match "distributed_logs_v2" or "logs_v2"
|
||||
{"distributed_logs_v2 not flagged by 'logs' pattern", "SELECT * FROM distributed_logs_v2", false},
|
||||
// "distributed_logs" must not match "distributed_logs_v2"
|
||||
{"distributed_logs_v2 not flagged by 'distributed_logs' pattern", "SELECT * FROM signoz_logs.distributed_logs_v2", false},
|
||||
// "metadata" must not match "attributes_metadata" or "distributed_attributes_metadata"
|
||||
{"distributed_attributes_metadata not flagged by 'metadata' pattern", "SELECT * FROM distributed_attributes_metadata", false},
|
||||
{"attributes_metadata flagged by its own pattern", "SELECT * FROM attributes_metadata", true},
|
||||
// "samples" must not match "samples_v4" (caught by samples_v4 entry instead)
|
||||
{"distributed_samples_v4 not flagged by 'samples' pattern", "SELECT * FROM distributed_samples_v4", false},
|
||||
// "time_series_v4" must not match "time_series_v4_6hrs"
|
||||
{"distributed_time_series_v4_6hrs not flagged by 'time_series_v4' pattern", "SELECT * FROM distributed_time_series_v4_6hrs", false},
|
||||
// "signoz_index_v2" must not match "distributed_signoz_index_v2" (separate entry)
|
||||
{"distributed_signoz_index_v2 flagged by its own pattern", "SELECT * FROM distributed_signoz_index_v2", true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := (ClickHouseQuery{Name: "A", Query: tt.query}).Validate()
|
||||
if tt.wantErr && err == nil {
|
||||
t.Errorf("Validate() expected error but got none")
|
||||
} else if !tt.wantErr && err != nil {
|
||||
t.Errorf("Validate() unexpected error = %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestClickHouseQuery_Validate_CaseInsensitive verifies that table pattern
|
||||
// matching is case-insensitive.
|
||||
func TestClickHouseQuery_Validate_CaseInsensitive(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
query string
|
||||
}{
|
||||
{"deprecated table uppercase", "SELECT * FROM DISTRIBUTED_SIGNOZ_INDEX_V2"},
|
||||
{"deprecated table mixed case", "SELECT * FROM Distributed_SignoZ_Index_V2"},
|
||||
{"local table uppercase", "SELECT * FROM SAMPLES_V4"},
|
||||
{"local table mixed case", "SELECT * FROM Time_Series_V4"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := (ClickHouseQuery{Name: "A", Query: tt.query}).Validate()
|
||||
if err == nil {
|
||||
t.Errorf("Validate() expected error for %q but got none", tt.query)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -608,7 +608,13 @@ func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) erro
|
||||
"invalid ClickHouse SQL spec",
|
||||
)
|
||||
}
|
||||
return spec.Validate()
|
||||
if spec.Query == "" {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"ClickHouse SQL query is required",
|
||||
)
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
|
||||
@@ -279,7 +279,7 @@ func TestQueryRangeRequest_ValidateAllQueriesNotDisabled(t *testing.T) {
|
||||
Type: QueryTypeClickHouseSQL,
|
||||
Spec: ClickHouseQuery{
|
||||
Name: "CH1",
|
||||
Query: "SELECT count() FROM distributed_logs_v2",
|
||||
Query: "SELECT count() FROM logs",
|
||||
Disabled: true,
|
||||
},
|
||||
},
|
||||
@@ -559,7 +559,7 @@ func TestQueryRangeRequest_ValidateCompositeQuery(t *testing.T) {
|
||||
Type: QueryTypeClickHouseSQL,
|
||||
Spec: ClickHouseQuery{
|
||||
Name: "CH1",
|
||||
Query: "SELECT count() FROM distributed_logs_v2",
|
||||
Query: "SELECT count() FROM logs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1150,22 +1150,6 @@ func TestRequestType_IsAggregation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClickHouseQuery_Validate(t *testing.T) {
|
||||
t.Run("empty name", func(t *testing.T) {
|
||||
err := (ClickHouseQuery{Name: "", Query: "SELECT 1"}).Validate()
|
||||
if err == nil || !contains(err.Error(), "name is required") {
|
||||
t.Errorf("Validate() expected 'name is required' error, got %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("empty query", func(t *testing.T) {
|
||||
err := (ClickHouseQuery{Name: "A", Query: ""}).Validate()
|
||||
if err == nil || !contains(err.Error(), "ClickHouse SQL query is required") {
|
||||
t.Errorf("Validate() expected 'ClickHouse SQL query is required' error, got %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNonAggregationFieldsSkipped(t *testing.T) {
|
||||
// Fields that only apply to aggregation queries (groupBy, having, aggregations)
|
||||
// should be silently skipped for non-aggregation request types.
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
from datetime import datetime, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.querier import make_query_request
|
||||
|
||||
|
||||
def test_clickhouse_sql_valid_query(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
"""
|
||||
A valid ClickHouse SQL query referencing a distributed table is accepted.
|
||||
No data insertion required — we only verify the request is not rejected.
|
||||
"""
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
end = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
|
||||
start = end - 60_000
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start,
|
||||
end,
|
||||
[
|
||||
{
|
||||
"type": "clickhouse_sql",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"query": "SELECT count() AS value FROM signoz_logs.distributed_logs_v2",
|
||||
},
|
||||
}
|
||||
],
|
||||
request_type="scalar",
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"query, expected_error",
|
||||
[
|
||||
(
|
||||
"SELECT * FROM signoz_logs.distributed_logs LIMIT 10",
|
||||
"deprecated table",
|
||||
),
|
||||
(
|
||||
"SELECT count() AS value FROM signoz_logs.logs_v2",
|
||||
"local table",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_clickhouse_sql_rejected_tables(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
query: str,
|
||||
expected_error: str,
|
||||
) -> None:
|
||||
"""
|
||||
ClickHouse SQL queries referencing deprecated or local (non-distributed)
|
||||
tables are rejected with HTTP 400.
|
||||
"""
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
end = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
|
||||
start = end - 60_000
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start,
|
||||
end,
|
||||
[{"type": "clickhouse_sql", "spec": {"name": "A", "query": query}}],
|
||||
request_type="raw",
|
||||
)
|
||||
assert response.status_code == HTTPStatus.BAD_REQUEST
|
||||
body = response.json()
|
||||
assert body["status"] == "error"
|
||||
assert expected_error in body["error"]["message"]
|
||||
Reference in New Issue
Block a user