mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-19 23:10:25 +01:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85d7075d3b | ||
|
|
c648b72ace | ||
|
|
a449698dfe |
@@ -77,19 +77,4 @@ apiserver:
|
||||
- /api/v3/logs/livetail
|
||||
logging:
|
||||
excluded_routes:
|
||||
- /api/v1/health
|
||||
|
||||
|
||||
##################### TelemetryStore #####################
|
||||
telemetrystore:
|
||||
# specifies the telemetrystore provider to use.
|
||||
provider: clickhouse
|
||||
clickhouse:
|
||||
# The DSN to use for ClickHouse.
|
||||
dsn: http://localhost:9000
|
||||
# Maximum number of idle connections in the connection pool.
|
||||
max_idle_conns: 50
|
||||
# Maximum number of open connections to the database.
|
||||
max_open_conns: 100
|
||||
# Maximum time to wait for a connection to be established.
|
||||
dial_timeout: 5s
|
||||
- /api/v1/health
|
||||
@@ -26,6 +26,9 @@ type APIHandlerOptions struct {
|
||||
DataConnector interfaces.DataConnector
|
||||
SkipConfig *basemodel.SkipConfig
|
||||
PreferSpanMetrics bool
|
||||
MaxIdleConns int
|
||||
MaxOpenConns int
|
||||
DialTimeout time.Duration
|
||||
AppDao dao.ModelDao
|
||||
RulesManager *rules.Manager
|
||||
UsageManager *usage.Manager
|
||||
@@ -54,6 +57,9 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
||||
Reader: opts.DataConnector,
|
||||
SkipConfig: opts.SkipConfig,
|
||||
PreferSpanMetrics: opts.PreferSpanMetrics,
|
||||
MaxIdleConns: opts.MaxIdleConns,
|
||||
MaxOpenConns: opts.MaxOpenConns,
|
||||
DialTimeout: opts.DialTimeout,
|
||||
AppDao: opts.AppDao,
|
||||
RuleManager: opts.RulesManager,
|
||||
FeatureFlags: opts.FeatureFlags,
|
||||
|
||||
@@ -20,20 +20,22 @@ type ClickhouseReader struct {
|
||||
|
||||
func NewDataConnector(
|
||||
localDB *sqlx.DB,
|
||||
ch clickhouse.Conn,
|
||||
promConfigPath string,
|
||||
lm interfaces.FeatureLookup,
|
||||
maxIdleConns int,
|
||||
maxOpenConns int,
|
||||
dialTimeout time.Duration,
|
||||
cluster string,
|
||||
useLogsNewSchema bool,
|
||||
useTraceNewSchema bool,
|
||||
fluxIntervalForTraceDetail time.Duration,
|
||||
cache cache.Cache,
|
||||
) *ClickhouseReader {
|
||||
chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
return &ClickhouseReader{
|
||||
conn: ch,
|
||||
conn: ch.GetConn(),
|
||||
appdb: localDB,
|
||||
ClickHouseReader: chReader,
|
||||
ClickHouseReader: ch,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -74,6 +74,9 @@ type ServerOptions struct {
|
||||
DisableRules bool
|
||||
RuleRepoURL string
|
||||
PreferSpanMetrics bool
|
||||
MaxIdleConns int
|
||||
MaxOpenConns int
|
||||
DialTimeout time.Duration
|
||||
CacheConfigPath string
|
||||
FluxInterval string
|
||||
FluxIntervalForTraceDetail string
|
||||
@@ -154,9 +157,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
zap.L().Info("Using ClickHouse as datastore ...")
|
||||
qb := db.NewDataConnector(
|
||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
|
||||
serverOptions.PromConfigPath,
|
||||
lm,
|
||||
serverOptions.MaxIdleConns,
|
||||
serverOptions.MaxOpenConns,
|
||||
serverOptions.DialTimeout,
|
||||
serverOptions.Cluster,
|
||||
serverOptions.UseLogsNewSchema,
|
||||
serverOptions.UseTraceNewSchema,
|
||||
@@ -240,7 +245,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
}
|
||||
|
||||
// start the usagemanager
|
||||
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.Config.TelemetryStore.ClickHouse.DSN)
|
||||
usageManager, err := usage.New(modelDao, lm.GetRepo(), reader.GetConn())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -261,6 +266,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
DataConnector: reader,
|
||||
SkipConfig: skipConfig,
|
||||
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
|
||||
MaxIdleConns: serverOptions.MaxIdleConns,
|
||||
MaxOpenConns: serverOptions.MaxOpenConns,
|
||||
DialTimeout: serverOptions.DialTimeout,
|
||||
AppDao: modelDao,
|
||||
RulesManager: rm,
|
||||
UsageManager: usageManager,
|
||||
|
||||
@@ -141,10 +141,6 @@ func main() {
|
||||
envprovider.NewFactory(),
|
||||
fileprovider.NewFactory(),
|
||||
},
|
||||
}, signoz.DeprecatedFlags{
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxOpenConns: maxOpenConns,
|
||||
DialTimeout: dialTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create config", zap.Error(err))
|
||||
@@ -165,6 +161,9 @@ func main() {
|
||||
PrivateHostPort: baseconst.PrivateHostPort,
|
||||
DisableRules: disableRules,
|
||||
RuleRepoURL: ruleRepoURL,
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxOpenConns: maxOpenConns,
|
||||
DialTimeout: dialTimeout,
|
||||
CacheConfigPath: cacheConfigPath,
|
||||
FluxInterval: fluxInterval,
|
||||
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
@@ -45,9 +46,9 @@ type Manager struct {
|
||||
tenantID string
|
||||
}
|
||||
|
||||
func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn, chUrl string) (*Manager, error) {
|
||||
func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) {
|
||||
hostNameRegex := regexp.MustCompile(`tcp://(?P<hostname>.*):`)
|
||||
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(chUrl)
|
||||
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(os.Getenv("ClickHouseUrl"))
|
||||
|
||||
tenantID := ""
|
||||
if len(hostNameRegexMatches) == 2 {
|
||||
|
||||
@@ -6,10 +6,7 @@ import {
|
||||
getValuesFromQueryParams,
|
||||
setQueryParamsFromOptions,
|
||||
} from 'components/CeleryTask/CeleryUtils';
|
||||
import {
|
||||
FilterCofigs,
|
||||
useCeleryFilterOptions,
|
||||
} from 'components/CeleryTask/useCeleryFilterOptions';
|
||||
import { useCeleryFilterOptions } from 'components/CeleryTask/useCeleryFilterOptions';
|
||||
import { SelectMaxTagPlaceholder } from 'components/MessagingQueues/MQCommon/MQCommon';
|
||||
import { QueryParams } from 'constants/query';
|
||||
import useUrlQuery from 'hooks/useUrlQuery';
|
||||
@@ -17,24 +14,20 @@ import { Check, Share2 } from 'lucide-react';
|
||||
import { useState } from 'react';
|
||||
import { useHistory, useLocation } from 'react-router-dom';
|
||||
import { useCopyToClipboard } from 'react-use';
|
||||
import { DataSource } from 'types/common/queryBuilder';
|
||||
|
||||
interface SelectOptionConfig {
|
||||
placeholder: string;
|
||||
queryParam: QueryParams;
|
||||
filterType: string | string[];
|
||||
filterConfigs?: FilterCofigs;
|
||||
}
|
||||
|
||||
function FilterSelect({
|
||||
placeholder,
|
||||
queryParam,
|
||||
filterType,
|
||||
filterConfigs,
|
||||
}: SelectOptionConfig): JSX.Element {
|
||||
const { handleSearch, isFetching, options } = useCeleryFilterOptions(
|
||||
filterType,
|
||||
filterConfigs,
|
||||
);
|
||||
|
||||
const urlQuery = useUrlQuery();
|
||||
@@ -72,26 +65,12 @@ function FilterSelect({
|
||||
);
|
||||
}
|
||||
|
||||
FilterSelect.defaultProps = {
|
||||
filterConfigs: undefined,
|
||||
};
|
||||
|
||||
function CeleryOverviewConfigOptions(): JSX.Element {
|
||||
const [isURLCopied, setIsURLCopied] = useState(false);
|
||||
|
||||
const [, handleCopyToClipboard] = useCopyToClipboard();
|
||||
|
||||
const selectConfigs: SelectOptionConfig[] = [
|
||||
{
|
||||
placeholder: 'Environment',
|
||||
queryParam: QueryParams.environment,
|
||||
filterType: 'resource_deployment_environment',
|
||||
filterConfigs: {
|
||||
aggregateOperator: 'rate',
|
||||
dataSource: DataSource.METRICS,
|
||||
aggregateAttribute: 'signoz_calls_total',
|
||||
},
|
||||
},
|
||||
{
|
||||
placeholder: 'Service Name',
|
||||
queryParam: QueryParams.service,
|
||||
@@ -136,7 +115,6 @@ function CeleryOverviewConfigOptions(): JSX.Element {
|
||||
placeholder={config.placeholder}
|
||||
queryParam={config.queryParam}
|
||||
filterType={config.filterType}
|
||||
filterConfigs={config.filterConfigs}
|
||||
/>
|
||||
))}
|
||||
</Row>
|
||||
|
||||
@@ -322,11 +322,6 @@ function makeFilters(urlQuery: URLSearchParams): Filter[] {
|
||||
{ paramName: QueryParams.kindString, key: 'kind_string', operator: 'in' },
|
||||
{ paramName: QueryParams.service, key: 'service.name', operator: 'in' },
|
||||
{ paramName: QueryParams.spanName, key: 'name', operator: 'in' },
|
||||
{
|
||||
paramName: QueryParams.environment,
|
||||
key: 'deployment.environment',
|
||||
operator: 'in',
|
||||
},
|
||||
];
|
||||
|
||||
return filterConfigs
|
||||
|
||||
@@ -6,17 +6,13 @@ import { DataSource } from 'types/common/queryBuilder';
|
||||
|
||||
import { useGetAllFilters } from './CeleryTaskConfigOptions/useGetCeleryFilters';
|
||||
|
||||
export interface FilterCofigs {
|
||||
aggregateOperator?: string;
|
||||
dataSource?: DataSource;
|
||||
aggregateAttribute?: string;
|
||||
filterAttributeKeyDataType?: DataTypes;
|
||||
tagType?: string;
|
||||
}
|
||||
|
||||
export const useCeleryFilterOptions = (
|
||||
type: string | string[],
|
||||
filterCofigs?: FilterCofigs,
|
||||
aggregateOperator?: string,
|
||||
dataSource?: DataSource,
|
||||
aggregateAttribute?: string,
|
||||
filterAttributeKeyDataType?: DataTypes,
|
||||
tagType?: string,
|
||||
): {
|
||||
searchText: string;
|
||||
handleSearch: (value: string) => void;
|
||||
@@ -27,7 +23,11 @@ export const useCeleryFilterOptions = (
|
||||
const { isFetching, options } = useGetAllFilters({
|
||||
attributeKey: type,
|
||||
searchText,
|
||||
...filterCofigs,
|
||||
aggregateOperator,
|
||||
dataSource,
|
||||
aggregateAttribute,
|
||||
filterAttributeKeyDataType,
|
||||
tagType,
|
||||
});
|
||||
const handleDebouncedSearch = useDebouncedFn((searchText): void => {
|
||||
setSearchText(searchText as string);
|
||||
|
||||
@@ -46,5 +46,4 @@ export enum QueryParams {
|
||||
msgSystem = 'msgSystem',
|
||||
destination = 'destination',
|
||||
kindString = 'kindString',
|
||||
environment = 'environment',
|
||||
}
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package clickhouseReader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Encoding string
|
||||
@@ -16,6 +18,7 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultDatasource string = "tcp://localhost:9000"
|
||||
defaultTraceDB string = "signoz_traces"
|
||||
defaultOperationsTable string = "distributed_signoz_operations"
|
||||
defaultIndexTable string = "distributed_signoz_index_v2"
|
||||
@@ -55,6 +58,9 @@ type namespaceConfig struct {
|
||||
namespace string
|
||||
Enabled bool
|
||||
Datasource string
|
||||
MaxIdleConns int
|
||||
MaxOpenConns int
|
||||
DialTimeout time.Duration
|
||||
TraceDB string
|
||||
OperationsTable string
|
||||
IndexTable string
|
||||
@@ -93,6 +99,37 @@ type namespaceConfig struct {
|
||||
// Connecto defines how to connect to the database
|
||||
type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error)
|
||||
|
||||
func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
|
||||
ctx := context.Background()
|
||||
options, err := clickhouse.ParseDSN(cfg.Datasource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check if the DSN contained any of the following options, if not set from configuration
|
||||
if options.MaxIdleConns == 0 {
|
||||
options.MaxIdleConns = cfg.MaxIdleConns
|
||||
}
|
||||
if options.MaxOpenConns == 0 {
|
||||
options.MaxOpenConns = cfg.MaxOpenConns
|
||||
}
|
||||
if options.DialTimeout == 0 {
|
||||
options.DialTimeout = cfg.DialTimeout
|
||||
}
|
||||
|
||||
zap.L().Info("Connecting to Clickhouse", zap.String("at", options.Addr[0]), zap.Int("MaxIdleConns", options.MaxIdleConns), zap.Int("MaxOpenConns", options.MaxOpenConns), zap.Duration("DialTimeout", options.DialTimeout))
|
||||
db, err := clickhouse.Open(options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := db.Ping(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// Options store storage plugin related configs
|
||||
type Options struct {
|
||||
primary *namespaceConfig
|
||||
@@ -102,13 +139,26 @@ type Options struct {
|
||||
|
||||
// NewOptions creates a new Options struct.
|
||||
func NewOptions(
|
||||
datasource string,
|
||||
maxIdleConns int,
|
||||
maxOpenConns int,
|
||||
dialTimeout time.Duration,
|
||||
primaryNamespace string,
|
||||
otherNamespaces ...string,
|
||||
) *Options {
|
||||
|
||||
if datasource == "" {
|
||||
datasource = defaultDatasource
|
||||
}
|
||||
|
||||
options := &Options{
|
||||
primary: &namespaceConfig{
|
||||
namespace: primaryNamespace,
|
||||
Enabled: true,
|
||||
Datasource: datasource,
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxOpenConns: maxOpenConns,
|
||||
DialTimeout: dialTimeout,
|
||||
TraceDB: defaultTraceDB,
|
||||
OperationsTable: defaultOperationsTable,
|
||||
IndexTable: defaultIndexTable,
|
||||
@@ -131,6 +181,7 @@ func NewOptions(
|
||||
WriteBatchDelay: defaultWriteBatchDelay,
|
||||
WriteBatchSize: defaultWriteBatchSize,
|
||||
Encoding: defaultEncoding,
|
||||
Connector: defaultConnector,
|
||||
|
||||
LogsTableV2: defaultLogsTableV2,
|
||||
LogsLocalTableV2: defaultLogsLocalTableV2,
|
||||
@@ -149,6 +200,7 @@ func NewOptions(
|
||||
if namespace == archiveNamespace {
|
||||
options.others[namespace] = &namespaceConfig{
|
||||
namespace: namespace,
|
||||
Datasource: datasource,
|
||||
TraceDB: "",
|
||||
OperationsTable: "",
|
||||
IndexTable: "",
|
||||
@@ -162,6 +214,7 @@ func NewOptions(
|
||||
WriteBatchDelay: defaultWriteBatchDelay,
|
||||
WriteBatchSize: defaultWriteBatchSize,
|
||||
Encoding: defaultEncoding,
|
||||
Connector: defaultConnector,
|
||||
}
|
||||
} else {
|
||||
options.others[namespace] = &namespaceConfig{namespace: namespace}
|
||||
|
||||
@@ -166,16 +166,26 @@ type ClickHouseReader struct {
|
||||
// NewTraceReader returns a TraceReader for the database
|
||||
func NewReader(
|
||||
localDB *sqlx.DB,
|
||||
db driver.Conn,
|
||||
configFile string,
|
||||
featureFlag interfaces.FeatureLookup,
|
||||
maxIdleConns int,
|
||||
maxOpenConns int,
|
||||
dialTimeout time.Duration,
|
||||
cluster string,
|
||||
useLogsNewSchema bool,
|
||||
useTraceNewSchema bool,
|
||||
fluxIntervalForTraceDetail time.Duration,
|
||||
cache cache.Cache,
|
||||
) *ClickHouseReader {
|
||||
options := NewOptions(primaryNamespace, archiveNamespace)
|
||||
|
||||
datasource := os.Getenv("ClickHouseUrl")
|
||||
options := NewOptions(datasource, maxIdleConns, maxOpenConns, dialTimeout, primaryNamespace, archiveNamespace)
|
||||
db, err := initialize(options)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
|
||||
}
|
||||
|
||||
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
}
|
||||
|
||||
@@ -198,6 +208,29 @@ func NewReaderFromClickhouseConnection(
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
regex := os.Getenv("ClickHouseOptimizeReadInOrderRegex")
|
||||
var regexCompiled *regexp.Regexp
|
||||
if regex != "" {
|
||||
regexCompiled, err = regexp.Compile(regex)
|
||||
if err != nil {
|
||||
zap.L().Error("Incorrect regex for ClickHouseOptimizeReadInOrderRegex")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
wrap := clickhouseConnWrapper{
|
||||
conn: db,
|
||||
settings: ClickhouseQuerySettings{
|
||||
MaxExecutionTime: os.Getenv("ClickHouseMaxExecutionTime"),
|
||||
MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"),
|
||||
TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"),
|
||||
MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"),
|
||||
OptimizeReadInOrderRegex: os.Getenv("ClickHouseOptimizeReadInOrderRegex"),
|
||||
OptimizeReadInOrderRegexCompiled: regexCompiled,
|
||||
MaxResultRowsForCHQuery: constants.MaxResultRowsForCHQuery,
|
||||
},
|
||||
}
|
||||
|
||||
logsTableName := options.primary.LogsTable
|
||||
logsLocalTableName := options.primary.LogsLocalTable
|
||||
if useLogsNewSchema {
|
||||
@@ -213,7 +246,7 @@ func NewReaderFromClickhouseConnection(
|
||||
}
|
||||
|
||||
return &ClickHouseReader{
|
||||
db: db,
|
||||
db: wrap,
|
||||
localDB: localDB,
|
||||
TraceDB: options.primary.TraceDB,
|
||||
alertManager: alertManager,
|
||||
@@ -405,6 +438,28 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
func initialize(options *Options) (clickhouse.Conn, error) {
|
||||
|
||||
db, err := connect(options.getPrimary())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error connecting to primary db: %v", err)
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func connect(cfg *namespaceConfig) (clickhouse.Conn, error) {
|
||||
if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto {
|
||||
return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto)
|
||||
}
|
||||
|
||||
return cfg.Connector(cfg)
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetConn() clickhouse.Conn {
|
||||
return r.db
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
|
||||
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
|
||||
if err != nil {
|
||||
@@ -2895,6 +2950,58 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
|
||||
return metricNameToTemporality, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTemporalitySwitchPoints(ctx context.Context, metricName string, startTime int64, endTime int64) ([]v3.TemporalityChangePoint, error) {
|
||||
// Initialize slice to store temporality switch points
|
||||
var temporalitySwitches []v3.TemporalityChangePoint
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
temporality,
|
||||
unix_milli,
|
||||
lag_temporality
|
||||
FROM (
|
||||
SELECT
|
||||
metric_name,
|
||||
temporality,
|
||||
unix_milli,
|
||||
lagInFrame(temporality, 1, '') OVER (
|
||||
PARTITION BY metric_name ORDER BY unix_milli
|
||||
) AS lag_temporality
|
||||
FROM %s.%s
|
||||
WHERE unix_milli >= %d
|
||||
AND unix_milli <= %d
|
||||
AND metric_name = '%s'
|
||||
) AS subquery
|
||||
WHERE lag_temporality != temporality
|
||||
AND lag_temporality != ''
|
||||
ORDER BY unix_milli ASC;
|
||||
`, signozMetricDBName, signozTSLocalTableNameV4, startTime, endTime, metricName)
|
||||
|
||||
rows, err := r.db.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var temporality string
|
||||
var timestamp int64
|
||||
var lagTemporality string
|
||||
err := rows.Scan(&temporality, ×tamp, &lagTemporality)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Store each temporality switch point with both temporalities
|
||||
temporalitySwitches = append(temporalitySwitches, v3.TemporalityChangePoint{
|
||||
Timestamp: timestamp,
|
||||
FromTemporality: v3.Temporality(lagTemporality),
|
||||
ToTemporality: v3.Temporality(temporality),
|
||||
})
|
||||
}
|
||||
|
||||
return temporalitySwitches, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {
|
||||
|
||||
queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day)
|
||||
|
||||
124
pkg/query-service/app/clickhouseReader/wrapper.go
Normal file
124
pkg/query-service/app/clickhouseReader/wrapper.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package clickhouseReader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"regexp"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
)
|
||||
|
||||
type ClickhouseQuerySettings struct {
|
||||
MaxExecutionTime string
|
||||
MaxExecutionTimeLeaf string
|
||||
TimeoutBeforeCheckingExecutionSpeed string
|
||||
MaxBytesToRead string
|
||||
OptimizeReadInOrderRegex string
|
||||
OptimizeReadInOrderRegexCompiled *regexp.Regexp
|
||||
MaxResultRowsForCHQuery int
|
||||
}
|
||||
|
||||
type clickhouseConnWrapper struct {
|
||||
conn clickhouse.Conn
|
||||
settings ClickhouseQuerySettings
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) Ping(ctx context.Context) error {
|
||||
return c.conn.Ping(ctx)
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) Stats() driver.Stats {
|
||||
return c.conn.Stats()
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query string) context.Context {
|
||||
settings := clickhouse.Settings{}
|
||||
|
||||
logComment := c.getLogComment(ctx)
|
||||
if logComment != "" {
|
||||
settings["log_comment"] = logComment
|
||||
}
|
||||
|
||||
if ctx.Value("enforce_max_result_rows") != nil {
|
||||
settings["max_result_rows"] = c.settings.MaxResultRowsForCHQuery
|
||||
}
|
||||
|
||||
if c.settings.MaxBytesToRead != "" {
|
||||
settings["max_bytes_to_read"] = c.settings.MaxBytesToRead
|
||||
}
|
||||
|
||||
if c.settings.MaxExecutionTime != "" {
|
||||
settings["max_execution_time"] = c.settings.MaxExecutionTime
|
||||
}
|
||||
|
||||
if c.settings.MaxExecutionTimeLeaf != "" {
|
||||
settings["max_execution_time_leaf"] = c.settings.MaxExecutionTimeLeaf
|
||||
}
|
||||
|
||||
if c.settings.TimeoutBeforeCheckingExecutionSpeed != "" {
|
||||
settings["timeout_before_checking_execution_speed"] = c.settings.TimeoutBeforeCheckingExecutionSpeed
|
||||
}
|
||||
|
||||
// only list queries of
|
||||
if c.settings.OptimizeReadInOrderRegex != "" && c.settings.OptimizeReadInOrderRegexCompiled.Match([]byte(query)) {
|
||||
settings["optimize_read_in_order"] = 0
|
||||
}
|
||||
|
||||
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string {
|
||||
// Get the key-value pairs from context for log comment
|
||||
kv := ctx.Value(common.LogCommentKey)
|
||||
if kv == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
logCommentKVs, ok := kv.(map[string]string)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
logComment, _ := json.Marshal(logCommentKVs)
|
||||
|
||||
return string(logComment)
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
||||
return c.conn.Query(c.addClickHouseSettings(ctx, query), query, args...)
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
||||
return c.conn.QueryRow(c.addClickHouseSettings(ctx, query), query, args...)
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
||||
return c.conn.Select(c.addClickHouseSettings(ctx, query), dest, query, args...)
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error {
|
||||
return c.conn.Exec(c.addClickHouseSettings(ctx, query), query, args...)
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
||||
return c.conn.AsyncInsert(c.addClickHouseSettings(ctx, query), query, wait, args...)
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
||||
return c.conn.PrepareBatch(c.addClickHouseSettings(ctx, query), query, opts...)
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) {
|
||||
return c.conn.ServerVersion()
|
||||
}
|
||||
|
||||
func (c clickhouseConnWrapper) Contributors() []string {
|
||||
return c.conn.Contributors()
|
||||
}
|
||||
@@ -97,6 +97,10 @@ type APIHandler struct {
|
||||
temporalityMap map[string]map[v3.Temporality]bool
|
||||
temporalityMux sync.Mutex
|
||||
|
||||
maxIdleConns int
|
||||
maxOpenConns int
|
||||
dialTimeout time.Duration
|
||||
|
||||
IntegrationsController *integrations.Controller
|
||||
|
||||
CloudIntegrationsController *cloudintegrations.Controller
|
||||
@@ -138,6 +142,10 @@ type APIHandlerOpts struct {
|
||||
|
||||
PreferSpanMetrics bool
|
||||
|
||||
MaxIdleConns int
|
||||
MaxOpenConns int
|
||||
DialTimeout time.Duration
|
||||
|
||||
// dao layer to perform crud on app objects like dashboard, alerts etc
|
||||
AppDao dao.ModelDao
|
||||
|
||||
@@ -217,6 +225,9 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
skipConfig: opts.SkipConfig,
|
||||
preferSpanMetrics: opts.PreferSpanMetrics,
|
||||
temporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||
maxIdleConns: opts.MaxIdleConns,
|
||||
maxOpenConns: opts.MaxOpenConns,
|
||||
dialTimeout: opts.DialTimeout,
|
||||
alertManager: alertManager,
|
||||
ruleManager: opts.RuleManager,
|
||||
featureFlags: opts.FeatureFlags,
|
||||
@@ -644,6 +655,9 @@ func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRange
|
||||
} else {
|
||||
query.Temporality = v3.Unspecified
|
||||
}
|
||||
if len(aH.temporalityMap[query.AggregateAttribute.Key]) > 1 {
|
||||
query.MultipleTemporalities = true
|
||||
}
|
||||
}
|
||||
// we don't have temporality for this metric
|
||||
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
|
||||
@@ -671,6 +685,9 @@ func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRange
|
||||
} else {
|
||||
query.Temporality = v3.Unspecified
|
||||
}
|
||||
if len(nameToTemporality[query.AggregateAttribute.Key]) > 1 {
|
||||
query.MultipleTemporalities = true
|
||||
}
|
||||
aH.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1352,7 +1352,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
}
|
||||
testName := "name"
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
|
||||
// iterate over test data, create reader and run test
|
||||
for _, tc := range testCases {
|
||||
|
||||
@@ -177,7 +177,23 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
|
||||
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
|
||||
if queryName == builderQuery.Expression {
|
||||
wg.Add(1)
|
||||
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
|
||||
if builderQuery.MultipleTemporalities == true {
|
||||
go func() {
|
||||
|
||||
temporalitySwitches, err := q.reader.GetTemporalitySwitchPoints(ctx, builderQuery.AggregateAttribute.Key, params.Start, params.End)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName}
|
||||
return
|
||||
}
|
||||
if len(temporalitySwitches) == 0 {
|
||||
q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
|
||||
} else {
|
||||
q.handleTemporalitySwitches(ctx, temporalitySwitches, &wg, builderQuery, params, cacheKeys, ch, queryName)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,6 +225,58 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) handleTemporalitySwitches(ctx context.Context, temporalitySwitches []v3.TemporalityChangePoint, wg *sync.WaitGroup, builderQuery *v3.BuilderQuery, params *v3.QueryRangeParamsV3, cacheKeys map[string]string, ch chan channelResult, queryName string) {
|
||||
defer wg.Done()
|
||||
|
||||
tempCh := make(chan channelResult, len(temporalitySwitches)+1)
|
||||
|
||||
var tempWg sync.WaitGroup
|
||||
// Handle each segment between switch points
|
||||
for i := 0; i <= len(temporalitySwitches); i++ {
|
||||
tempWg.Add(1)
|
||||
go func(idx int) {
|
||||
queryWithTemporality := *builderQuery
|
||||
queryParams := *params
|
||||
if i == 0 {
|
||||
queryParams.End = temporalitySwitches[idx].Timestamp
|
||||
queryWithTemporality.Temporality = temporalitySwitches[idx].FromTemporality
|
||||
} else if idx < len(temporalitySwitches) {
|
||||
queryParams.Start = temporalitySwitches[idx-1].Timestamp
|
||||
queryParams.End = temporalitySwitches[idx].Timestamp
|
||||
queryWithTemporality.Temporality = temporalitySwitches[idx].FromTemporality
|
||||
queryWithTemporality.ShiftBy = 0
|
||||
} else if idx == len(temporalitySwitches) {
|
||||
queryParams.Start = temporalitySwitches[idx-1].Timestamp
|
||||
queryParams.End = params.End
|
||||
queryWithTemporality.Temporality = temporalitySwitches[idx-1].ToTemporality
|
||||
}
|
||||
|
||||
q.runBuilderQuery(ctx, &queryWithTemporality, &queryParams, cacheKeys, tempCh, &tempWg)
|
||||
}(i)
|
||||
}
|
||||
// Wait for all temporal queries to complete
|
||||
tempWg.Wait()
|
||||
close(tempCh)
|
||||
|
||||
// Combine results from all temporal queries
|
||||
var combinedSeries []*v3.Series
|
||||
var lastErr error
|
||||
|
||||
for result := range tempCh {
|
||||
if result.Err != nil {
|
||||
lastErr = result.Err
|
||||
continue
|
||||
}
|
||||
combinedSeries = append(combinedSeries, result.Series...)
|
||||
}
|
||||
|
||||
ch <- channelResult{
|
||||
Series: combinedSeries,
|
||||
Err: lastErr,
|
||||
Name: queryName,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||
channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@@ -1406,7 +1406,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
}
|
||||
testName := "name"
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
|
||||
// iterate over test data, create reader and run test
|
||||
for _, tc := range testCases {
|
||||
|
||||
@@ -62,6 +62,9 @@ type ServerOptions struct {
|
||||
DisableRules bool
|
||||
RuleRepoURL string
|
||||
PreferSpanMetrics bool
|
||||
MaxIdleConns int
|
||||
MaxOpenConns int
|
||||
DialTimeout time.Duration
|
||||
CacheConfigPath string
|
||||
FluxInterval string
|
||||
FluxIntervalForTraceDetail string
|
||||
@@ -129,9 +132,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
zap.L().Info("Using ClickHouse as datastore ...")
|
||||
clickhouseReader := clickhouseReader.NewReader(
|
||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
|
||||
serverOptions.PromConfigPath,
|
||||
fm,
|
||||
serverOptions.MaxIdleConns,
|
||||
serverOptions.MaxOpenConns,
|
||||
serverOptions.DialTimeout,
|
||||
serverOptions.Cluster,
|
||||
serverOptions.UseLogsNewSchema,
|
||||
serverOptions.UseTraceNewSchema,
|
||||
@@ -197,6 +202,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
Reader: reader,
|
||||
SkipConfig: skipConfig,
|
||||
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
|
||||
MaxIdleConns: serverOptions.MaxIdleConns,
|
||||
MaxOpenConns: serverOptions.MaxOpenConns,
|
||||
DialTimeout: serverOptions.DialTimeout,
|
||||
AppDao: dao.DB(),
|
||||
RuleManager: rm,
|
||||
FeatureFlags: fm,
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
@@ -84,6 +85,7 @@ type Reader interface {
|
||||
) (*v3.QBFilterSuggestionsResponse, *model.ApiError)
|
||||
|
||||
// Connection needed for rules, not ideal but required
|
||||
GetConn() clickhouse.Conn
|
||||
GetQueryEngine() *promql.Engine
|
||||
GetFanoutStorage() *storage.Storage
|
||||
|
||||
@@ -113,6 +115,8 @@ type Reader interface {
|
||||
//trace
|
||||
GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
|
||||
UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError
|
||||
|
||||
GetTemporalitySwitchPoints(ctx context.Context, metricName string, startTime int64, endTime int64) ([]v3.TemporalityChangePoint, error)
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
|
||||
@@ -85,10 +85,6 @@ func main() {
|
||||
envprovider.NewFactory(),
|
||||
fileprovider.NewFactory(),
|
||||
},
|
||||
}, signoz.DeprecatedFlags{
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxOpenConns: maxOpenConns,
|
||||
DialTimeout: dialTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create config", zap.Error(err))
|
||||
@@ -108,6 +104,9 @@ func main() {
|
||||
PrivateHostPort: constants.PrivateHostPort,
|
||||
DisableRules: disableRules,
|
||||
RuleRepoURL: ruleRepoURL,
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxOpenConns: maxOpenConns,
|
||||
DialTimeout: dialTimeout,
|
||||
CacheConfigPath: cacheConfigPath,
|
||||
FluxInterval: fluxInterval,
|
||||
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
|
||||
|
||||
@@ -807,33 +807,34 @@ func (m *MetricValueFilter) Clone() *MetricValueFilter {
|
||||
}
|
||||
|
||||
type BuilderQuery struct {
|
||||
QueryName string `json:"queryName"`
|
||||
StepInterval int64 `json:"stepInterval"`
|
||||
DataSource DataSource `json:"dataSource"`
|
||||
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
||||
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
|
||||
Temporality Temporality `json:"temporality,omitempty"`
|
||||
Filters *FilterSet `json:"filters,omitempty"`
|
||||
GroupBy []AttributeKey `json:"groupBy,omitempty"`
|
||||
Expression string `json:"expression"`
|
||||
Disabled bool `json:"disabled"`
|
||||
Having []Having `json:"having,omitempty"`
|
||||
Legend string `json:"legend,omitempty"`
|
||||
Limit uint64 `json:"limit"`
|
||||
Offset uint64 `json:"offset"`
|
||||
PageSize uint64 `json:"pageSize"`
|
||||
OrderBy []OrderBy `json:"orderBy,omitempty"`
|
||||
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
|
||||
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
|
||||
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
|
||||
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
|
||||
QueryName string `json:"queryName"`
|
||||
StepInterval int64 `json:"stepInterval"`
|
||||
DataSource DataSource `json:"dataSource"`
|
||||
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
||||
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
|
||||
Temporality Temporality `json:"temporality,omitempty"`
|
||||
Filters *FilterSet `json:"filters,omitempty"`
|
||||
GroupBy []AttributeKey `json:"groupBy,omitempty"`
|
||||
Expression string `json:"expression"`
|
||||
Disabled bool `json:"disabled"`
|
||||
Having []Having `json:"having,omitempty"`
|
||||
Legend string `json:"legend,omitempty"`
|
||||
Limit uint64 `json:"limit"`
|
||||
Offset uint64 `json:"offset"`
|
||||
PageSize uint64 `json:"pageSize"`
|
||||
OrderBy []OrderBy `json:"orderBy,omitempty"`
|
||||
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
|
||||
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
|
||||
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
|
||||
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
|
||||
SecondaryAggregation SecondaryAggregation `json:"seriesAggregation,omitempty"`
|
||||
Functions []Function `json:"functions,omitempty"`
|
||||
ShiftBy int64
|
||||
IsAnomaly bool
|
||||
QueriesUsedInFormula []string
|
||||
MetricTableHints *MetricTableHints `json:"-"`
|
||||
MetricValueFilter *MetricValueFilter `json:"-"`
|
||||
Functions []Function `json:"functions,omitempty"`
|
||||
ShiftBy int64
|
||||
IsAnomaly bool
|
||||
QueriesUsedInFormula []string
|
||||
MetricTableHints *MetricTableHints `json:"-"`
|
||||
MetricValueFilter *MetricValueFilter `json:"-"`
|
||||
MultipleTemporalities bool
|
||||
}
|
||||
|
||||
func (b *BuilderQuery) SetShiftByFromFunc() {
|
||||
@@ -1406,3 +1407,9 @@ type QBOptions struct {
|
||||
IsLivetailQuery bool
|
||||
PreferRPM bool
|
||||
}
|
||||
|
||||
type TemporalityChangePoint struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
FromTemporality Temporality `json:"from_temporality"`
|
||||
ToTemporality Temporality `json:"to_temporality"`
|
||||
}
|
||||
|
||||
@@ -1240,7 +1240,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
@@ -1339,7 +1339,7 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
@@ -1447,7 +1447,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
@@ -1572,7 +1572,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
|
||||
@@ -40,7 +40,7 @@ func NewMockClickhouseReader(
|
||||
require.Nil(t, err, "could not init mock clickhouse")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||
mockDB,
|
||||
clickhouseReader.NewOptions("", ""),
|
||||
clickhouseReader.NewOptions("", 10, 10, 10*time.Second, ""),
|
||||
testDB,
|
||||
"",
|
||||
featureFlags,
|
||||
|
||||
@@ -13,7 +13,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/instrumentation"
|
||||
"go.signoz.io/signoz/pkg/sqlmigrator"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/telemetrystore"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
)
|
||||
|
||||
@@ -36,20 +35,9 @@ type Config struct {
|
||||
|
||||
// API Server config
|
||||
APIServer apiserver.Config `mapstructure:"apiserver"`
|
||||
|
||||
// TelemetryStore config
|
||||
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
|
||||
}
|
||||
|
||||
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
|
||||
// These flags are used to ensure backward compatibility with the old flags.
|
||||
type DeprecatedFlags struct {
|
||||
MaxIdleConns int
|
||||
MaxOpenConns int
|
||||
DialTimeout time.Duration
|
||||
}
|
||||
|
||||
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) {
|
||||
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Config, error) {
|
||||
configFactories := []factory.ConfigFactory{
|
||||
instrumentation.NewConfigFactory(),
|
||||
web.NewConfigFactory(),
|
||||
@@ -57,7 +45,6 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
|
||||
sqlstore.NewConfigFactory(),
|
||||
sqlmigrator.NewConfigFactory(),
|
||||
apiserver.NewConfigFactory(),
|
||||
telemetrystore.NewConfigFactory(),
|
||||
}
|
||||
|
||||
conf, err := config.New(ctx, resolverConfig, configFactories)
|
||||
@@ -70,12 +57,12 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
|
||||
return Config{}, err
|
||||
}
|
||||
|
||||
mergeAndEnsureBackwardCompatibility(&config, deprecatedFlags)
|
||||
mergeAndEnsureBackwardCompatibility(&config)
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags DeprecatedFlags) {
|
||||
func mergeAndEnsureBackwardCompatibility(config *Config) {
|
||||
// SIGNOZ_LOCAL_DB_PATH
|
||||
if os.Getenv("SIGNOZ_LOCAL_DB_PATH") != "" {
|
||||
fmt.Println("[Deprecated] env SIGNOZ_LOCAL_DB_PATH is deprecated and scheduled for removal. Please use SIGNOZ_SQLSTORE_SQLITE_PATH instead.")
|
||||
@@ -100,21 +87,4 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
|
||||
fmt.Println("Error parsing CONTEXT_TIMEOUT_MAX_ALLOWED, using default value of 600s")
|
||||
}
|
||||
}
|
||||
if os.Getenv("ClickHouseUrl") != "" {
|
||||
fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.")
|
||||
config.TelemetryStore.ClickHouse.DSN = os.Getenv("ClickHouseUrl")
|
||||
}
|
||||
|
||||
if deprecatedFlags.MaxIdleConns != 50 {
|
||||
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.")
|
||||
config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns
|
||||
}
|
||||
if deprecatedFlags.MaxOpenConns != 100 {
|
||||
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.")
|
||||
config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns
|
||||
}
|
||||
if deprecatedFlags.DialTimeout != 5*time.Second {
|
||||
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.")
|
||||
config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,9 +8,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/sqlmigration"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/sqlstore/sqlitesqlstore"
|
||||
"go.signoz.io/signoz/pkg/telemetrystore"
|
||||
"go.signoz.io/signoz/pkg/telemetrystore/clickhousetelemetrystore"
|
||||
"go.signoz.io/signoz/pkg/telemetrystore/telemetrystorehook"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
"go.signoz.io/signoz/pkg/web/noopweb"
|
||||
"go.signoz.io/signoz/pkg/web/routerweb"
|
||||
@@ -28,13 +25,9 @@ type ProviderConfig struct {
|
||||
|
||||
// Map of all sql migration provider factories
|
||||
SQLMigrationProviderFactories factory.NamedMap[factory.ProviderFactory[sqlmigration.SQLMigration, sqlmigration.Config]]
|
||||
|
||||
// Map of all telemetrystore provider factories
|
||||
TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]]
|
||||
}
|
||||
|
||||
func NewProviderConfig() ProviderConfig {
|
||||
hook := telemetrystorehook.NewFactory()
|
||||
return ProviderConfig{
|
||||
CacheProviderFactories: factory.MustNewNamedMap(
|
||||
memorycache.NewFactory(),
|
||||
@@ -57,8 +50,5 @@ func NewProviderConfig() ProviderConfig {
|
||||
sqlmigration.NewAddPipelinesFactory(),
|
||||
sqlmigration.NewAddIntegrationsFactory(),
|
||||
),
|
||||
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
|
||||
clickhousetelemetrystore.NewFactory(hook),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,17 +7,15 @@ import (
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/instrumentation"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/telemetrystore"
|
||||
"go.signoz.io/signoz/pkg/version"
|
||||
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
)
|
||||
|
||||
type SigNoz struct {
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -70,21 +68,9 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
telemetrystore, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
config.TelemetryStore,
|
||||
providerConfig.TelemetryStoreProviderFactories,
|
||||
config.TelemetryStore.Provider,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SigNoz{
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -1,120 +0,0 @@
|
||||
package clickhousetelemetrystore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/telemetrystore"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
clickHouseConn clickhouse.Conn
|
||||
hooks []telemetrystore.TelemetryStoreHook
|
||||
}
|
||||
|
||||
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) {
|
||||
// we want to fail fast so we have hook registration errors before creating the telemetry store
|
||||
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
|
||||
for i, hookFactory := range hookFactories {
|
||||
hook, err := hookFactory.New(ctx, providerSettings, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hooks[i] = hook
|
||||
}
|
||||
return New(ctx, providerSettings, config, hooks...)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/telemetrystore/clickhousetelemetrystore")
|
||||
|
||||
options, err := clickhouse.ParseDSN(config.ClickHouse.DSN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
options.MaxIdleConns = config.Connection.MaxIdleConns
|
||||
options.MaxOpenConns = config.Connection.MaxOpenConns
|
||||
options.DialTimeout = config.Connection.DialTimeout
|
||||
|
||||
chConn, err := clickhouse.Open(options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
clickHouseConn: chConn,
|
||||
hooks: hooks,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider) ClickHouseDB() clickhouse.Conn {
|
||||
return p
|
||||
}
|
||||
|
||||
func (p provider) Close() error {
|
||||
return p.clickHouseConn.Close()
|
||||
}
|
||||
|
||||
func (p provider) Ping(ctx context.Context) error {
|
||||
return p.clickHouseConn.Ping(ctx)
|
||||
}
|
||||
|
||||
func (p provider) Stats() driver.Stats {
|
||||
return p.clickHouseConn.Stats()
|
||||
}
|
||||
|
||||
func (p provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
rows, err := p.clickHouseConn.Query(ctx, query, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err)
|
||||
return rows, err
|
||||
}
|
||||
|
||||
func (p provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
row := p.clickHouseConn.QueryRow(ctx, query, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil)
|
||||
return row
|
||||
}
|
||||
|
||||
func (p provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
err := p.clickHouseConn.Select(ctx, dest, query, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (p provider) Exec(ctx context.Context, query string, args ...interface{}) error {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
err := p.clickHouseConn.Exec(ctx, query, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (p provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (p provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
||||
ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query)
|
||||
batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||
return batch, err
|
||||
}
|
||||
|
||||
func (p provider) ServerVersion() (*driver.ServerVersion, error) {
|
||||
return p.clickHouseConn.ServerVersion()
|
||||
}
|
||||
|
||||
func (p provider) Contributors() []string {
|
||||
return p.clickHouseConn.Contributors()
|
||||
}
|
||||
@@ -1,62 +0,0 @@
|
||||
package telemetrystore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// Provider is the provider to use
|
||||
Provider string `mapstructure:"provider"`
|
||||
// Connection is the connection configuration
|
||||
Connection ConnectionConfig `mapstructure:",squash"`
|
||||
// Clickhouse is the clickhouse configuration
|
||||
ClickHouse ClickHouseConfig `mapstructure:"clickhouse"`
|
||||
}
|
||||
|
||||
type ConnectionConfig struct {
|
||||
// MaxOpenConns is the maximum number of open connections to the database.
|
||||
MaxOpenConns int `mapstructure:"max_open_conns"`
|
||||
MaxIdleConns int `mapstructure:"max_idle_conns"`
|
||||
DialTimeout time.Duration `mapstructure:"dial_timeout"`
|
||||
}
|
||||
|
||||
type ClickHouseQuerySettings struct {
|
||||
MaxExecutionTime int `mapstructure:"max_execution_time"`
|
||||
MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
|
||||
TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
|
||||
MaxBytesToRead int `mapstructure:"max_bytes_to_read"`
|
||||
MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"`
|
||||
}
|
||||
|
||||
type ClickHouseConfig struct {
|
||||
DSN string `mapstructure:"dsn"`
|
||||
|
||||
QuerySettings ClickHouseQuerySettings `mapstructure:"settings"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig)
|
||||
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Provider: "clickhouse",
|
||||
Connection: ConnectionConfig{
|
||||
MaxOpenConns: 100,
|
||||
MaxIdleConns: 50,
|
||||
DialTimeout: 5 * time.Second,
|
||||
},
|
||||
ClickHouse: ClickHouseConfig{
|
||||
DSN: "http://localhost:9000",
|
||||
|
||||
// No default query settings, as default's are set in ch config
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,95 +0,0 @@
|
||||
package telemetrystore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/config"
|
||||
"go.signoz.io/signoz/pkg/config/envprovider"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
func TestNewWithEnvProvider(t *testing.T) {
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "http://localhost:9000")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DEBUG", "true")
|
||||
|
||||
conf, err := config.New(
|
||||
context.Background(),
|
||||
config.ResolverConfig{
|
||||
Uris: []string{"env:"},
|
||||
ProviderFactories: []config.ProviderFactory{
|
||||
envprovider.NewFactory(),
|
||||
},
|
||||
},
|
||||
[]factory.ConfigFactory{
|
||||
NewConfigFactory(),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual := &Config{}
|
||||
err = conf.Unmarshal("telemetrystore", actual)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := &Config{
|
||||
Provider: "clickhouse",
|
||||
Connection: ConnectionConfig{
|
||||
MaxOpenConns: 150,
|
||||
MaxIdleConns: 60,
|
||||
DialTimeout: 5 * time.Second,
|
||||
},
|
||||
ClickHouse: ClickHouseConfig{
|
||||
DSN: "http://localhost:9000",
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__EXECUTION__TIME", "10")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__EXECUTION__TIME__LEAF", "10")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_TIMEOUT__BEFORE__CHECKING__EXECUTION__SPEED", "10")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__BYTES__TO__READ", "1000000")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__RESULT__ROWS__FOR__CH__QUERY", "10000")
|
||||
|
||||
conf, err := config.New(
|
||||
context.Background(),
|
||||
config.ResolverConfig{
|
||||
Uris: []string{"env:"},
|
||||
ProviderFactories: []config.ProviderFactory{
|
||||
envprovider.NewFactory(),
|
||||
},
|
||||
},
|
||||
[]factory.ConfigFactory{
|
||||
NewConfigFactory(),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual := &Config{}
|
||||
err = conf.Unmarshal("telemetrystore", actual)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := &Config{
|
||||
ClickHouse: ClickHouseConfig{
|
||||
QuerySettings: ClickHouseQuerySettings{
|
||||
MaxExecutionTime: 10,
|
||||
MaxExecutionTimeLeaf: 10,
|
||||
TimeoutBeforeCheckingExecutionSpeed: 10,
|
||||
MaxBytesToRead: 1000000,
|
||||
MaxResultRowsForCHQuery: 10000,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, expected.ClickHouse.QuerySettings, actual.ClickHouse.QuerySettings)
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
package telemetrystore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
)
|
||||
|
||||
type TelemetryStore interface {
|
||||
// Returns the SigNoz Wrapper for Clickhouse
|
||||
ClickHouseDB() clickhouse.Conn
|
||||
}
|
||||
|
||||
type TelemetryStoreHook interface {
|
||||
BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{})
|
||||
AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error)
|
||||
}
|
||||
|
||||
func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
|
||||
for _, hook := range hooks {
|
||||
ctx, query, args = hook.BeforeQuery(ctx, query, args...)
|
||||
}
|
||||
return ctx, query, args
|
||||
}
|
||||
|
||||
// runAfterHooks executes all after hooks in order
|
||||
func WrapAfterQuery(hooks []TelemetryStoreHook, ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
|
||||
for _, hook := range hooks {
|
||||
hook.AfterQuery(ctx, query, args, rows, err)
|
||||
}
|
||||
}
|
||||
@@ -1,85 +0,0 @@
|
||||
package telemetrystorehook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/telemetrystore"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
settings telemetrystore.ClickHouseQuerySettings
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("clickhousesettings"), New)
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
||||
return &provider{
|
||||
settings: config.ClickHouse.QuerySettings,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
|
||||
return h.clickHouseSettings(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// clickHouseSettings adds clickhouse settings to queries
|
||||
func (h *provider) clickHouseSettings(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
|
||||
settings := clickhouse.Settings{}
|
||||
|
||||
// Apply default settings
|
||||
logComment := h.getLogComment(ctx)
|
||||
if logComment != "" {
|
||||
settings["log_comment"] = logComment
|
||||
}
|
||||
|
||||
if ctx.Value("enforce_max_result_rows") != nil {
|
||||
settings["max_result_rows"] = h.settings.MaxResultRowsForCHQuery
|
||||
}
|
||||
|
||||
if h.settings.MaxBytesToRead != 0 {
|
||||
settings["max_bytes_to_read"] = h.settings.MaxBytesToRead
|
||||
}
|
||||
|
||||
if h.settings.MaxExecutionTime != 0 {
|
||||
settings["max_execution_time"] = h.settings.MaxExecutionTime
|
||||
}
|
||||
|
||||
if h.settings.MaxExecutionTimeLeaf != 0 {
|
||||
settings["max_execution_time_leaf"] = h.settings.MaxExecutionTimeLeaf
|
||||
}
|
||||
|
||||
if h.settings.TimeoutBeforeCheckingExecutionSpeed != 0 {
|
||||
settings["timeout_before_checking_execution_speed"] = h.settings.TimeoutBeforeCheckingExecutionSpeed
|
||||
}
|
||||
|
||||
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
|
||||
return ctx, query, args
|
||||
}
|
||||
|
||||
func (h *provider) getLogComment(ctx context.Context) string {
|
||||
// Get the key-value pairs from context for log comment
|
||||
kv := ctx.Value(common.LogCommentKey)
|
||||
if kv == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
logCommentKVs, ok := kv.(map[string]string)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
logComment, _ := json.Marshal(logCommentKVs)
|
||||
|
||||
return string(logComment)
|
||||
}
|
||||
@@ -1,34 +0,0 @@
|
||||
package telemetrystoretest
|
||||
|
||||
import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
)
|
||||
|
||||
// Provider represents a mock telemetry store provider for testing
|
||||
type Provider struct {
|
||||
mock cmock.ClickConnMockCommon
|
||||
}
|
||||
|
||||
// New creates a new mock telemetry store provider
|
||||
func New() (*Provider, error) {
|
||||
options := &clickhouse.Options{} // Default options
|
||||
mock, err := cmock.NewClickHouseNative(options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Provider{
|
||||
mock: mock,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Clickhouse returns the mock Clickhouse connection
|
||||
func (p *Provider) Clickhouse() clickhouse.Conn {
|
||||
return p.mock.(clickhouse.Conn)
|
||||
}
|
||||
|
||||
// Mock returns the underlying Clickhouse mock instance for setting expectations
|
||||
func (p *Provider) Mock() cmock.ClickConnMockCommon {
|
||||
return p.mock
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
package telemetrystoretest
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "should create new provider successfully",
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider, err := New()
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, provider)
|
||||
return
|
||||
}
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, provider)
|
||||
assert.NotNil(t, provider.Mock())
|
||||
assert.NotNil(t, provider.Clickhouse())
|
||||
|
||||
// Verify the returned interfaces implement the expected types
|
||||
_, ok := provider.Mock().(cmock.ClickConnMockCommon)
|
||||
assert.True(t, ok, "Mock() should return cmock.ClickConnMockCommon")
|
||||
|
||||
_, ok = provider.Clickhouse().(clickhouse.Conn)
|
||||
assert.True(t, ok, "Clickhouse() should return clickhouse.Conn")
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user