Compare commits

..

20 Commits

Author SHA1 Message Date
nikhilmantri0902
1948cdfaa4 chore: added for comparison across group by and merges debugging quantile exact 2025-08-19 01:16:24 +05:30
nikhilmantri0902
2fb7fe49ef chore: added for comparison across group by and merges debugging quantile exact 2025-08-19 00:56:43 +05:30
nikhilmantri0902
c4762045a6 chore: added debug functionality 2025-08-18 15:34:20 +05:30
nikhilmantri0902
1541734542 feat: match query building semantics for tags of 0 length 2025-08-18 15:26:12 +05:30
nikhilmantri0902
46e5b407f7 fix: added necessary 0 numCalls handling 2025-08-18 13:53:59 +05:30
nikhilmantri0902
f2c3946101 fix: added necessary 0 numCalls handling 2025-08-18 13:52:55 +05:30
nikhilmantri0902
4dca46de40 chore: added debugging funcs 2025-08-18 13:24:28 +05:30
nikhilmantri0902
6f420abe27 chore: removed comparison block 2025-08-18 13:09:12 +05:30
nikhilmantri0902
1d9b457af6 chore: removed comparison block 2025-08-18 12:40:51 +05:30
nikhilmantri0902
d437998750 chore: tuple issue fixed 2025-08-18 12:13:08 +05:30
nikhilmantri0902
e02d0cdd98 chore: tuple issue fixed 2025-08-18 11:44:45 +05:30
nikhilmantri0902
1ad4a6699a chore: added debug logs 2025-08-18 11:24:29 +05:30
nikhilmantri0902
00ae45022b fix: added filtering based on both name and serviceName pairs 2025-08-18 11:20:08 +05:30
nikhilmantri0902
6f4a965c6d fix: added filtering based on both name and serviceName pairs 2025-08-18 11:19:01 +05:30
nikhilmantri0902
4c29b03577 chore: added logs for debugging 2025-08-15 14:15:20 +05:30
nikhilmantri0902
ea1409bc4f fix: added query optimization 2025-08-14 20:12:48 +05:30
Abhi kumar
0e3ac2a179 fix: added loading indicators in traces pages when running query (#8782) 2025-08-14 13:53:39 +05:30
Amlan Kumar Nandy
249f8be845 fix: resolve infinite loading issue in metric view in messaging queues (#8779) 2025-08-14 04:16:39 +00:00
primus-bot[bot]
9c952942ad chore(release): bump to v0.92.1 (#8780)
Co-authored-by: primus-bot[bot] <171087277+primus-bot[bot]@users.noreply.github.com>
2025-08-13 15:10:08 +05:30
Nityananda Gohain
dac46d82ff fix: check ch version (#8778)
Check the clickhouse version, before the setting secondary_indices_enable_bulk_filtering is used.
2025-08-13 14:57:25 +05:30
15 changed files with 390 additions and 59 deletions

View File

@@ -174,7 +174,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.92.0
image: signoz/signoz:v0.92.1
command:
- --config=/root/config/prometheus.yml
ports:

View File

@@ -115,7 +115,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.92.0
image: signoz/signoz:v0.92.1
command:
- --config=/root/config/prometheus.yml
ports:

View File

@@ -177,7 +177,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.92.0}
image: signoz/signoz:${VERSION:-v0.92.1}
container_name: signoz
command:
- --config=/root/config/prometheus.yml

View File

@@ -110,7 +110,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.92.0}
image: signoz/signoz:${VERSION:-v0.92.1}
container_name: signoz
command:
- --config=/root/config/prometheus.yml

View File

@@ -20,6 +20,7 @@ function TimeSeriesViewContainer({
dataSource = DataSource.TRACES,
isFilterApplied,
setWarning,
setIsLoadingQueries,
}: TimeSeriesViewProps): JSX.Element {
const { stagedQuery, currentQuery, panelType } = useQueryBuilder();
@@ -83,6 +84,14 @@ function TimeSeriesViewContainer({
[data, isValidToConvertToMs],
);
useEffect(() => {
if (isLoading || isFetching) {
setIsLoadingQueries(true);
} else {
setIsLoadingQueries(false);
}
}, [isLoading, isFetching, setIsLoadingQueries]);
return (
<TimeSeriesView
isFilterApplied={isFilterApplied}
@@ -101,6 +110,7 @@ interface TimeSeriesViewProps {
dataSource?: DataSource;
isFilterApplied: boolean;
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
}
TimeSeriesViewContainer.defaultProps = {

View File

@@ -49,9 +49,14 @@ import { getListColumns, transformDataWithDate } from './utils';
interface ListViewProps {
isFilterApplied: boolean;
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
}
function ListView({ isFilterApplied, setWarning }: ListViewProps): JSX.Element {
function ListView({
isFilterApplied,
setWarning,
setIsLoadingQueries,
}: ListViewProps): JSX.Element {
const {
stagedQuery,
panelType: panelTypeFromQueryBuilder,
@@ -162,6 +167,14 @@ function ListView({ isFilterApplied, setWarning }: ListViewProps): JSX.Element {
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [data?.payload, data?.warning]);
useEffect(() => {
if (isLoading || isFetching) {
setIsLoadingQueries(true);
} else {
setIsLoadingQueries(false);
}
}, [isLoading, isFetching, setIsLoadingQueries]);
const dataLength =
data?.payload?.data?.newResult?.data?.result[0]?.list?.length;
const totalCount = useMemo(() => dataLength || 0, [dataLength]);

View File

@@ -16,8 +16,10 @@ import { GlobalReducer } from 'types/reducer/globalTime';
function TableView({
setWarning,
setIsLoadingQueries,
}: {
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
}): JSX.Element {
const { stagedQuery, panelType } = useQueryBuilder();
@@ -26,7 +28,7 @@ function TableView({
GlobalReducer
>((state) => state.globalTime);
const { data, isLoading, isError, error } = useGetQueryRange(
const { data, isLoading, isFetching, isError, error } = useGetQueryRange(
{
query: stagedQuery || initialQueriesMap.traces,
graphType: panelType || PANEL_TYPES.TABLE,
@@ -49,6 +51,14 @@ function TableView({
},
);
useEffect(() => {
if (isLoading || isFetching) {
setIsLoadingQueries(true);
} else {
setIsLoadingQueries(false);
}
}, [isLoading, isFetching, setIsLoadingQueries]);
const queryTableData = useMemo(
() =>
data?.payload?.data?.newResult?.data?.result ||

View File

@@ -40,11 +40,13 @@ import { ActionsContainer, Container } from './styles';
interface TracesViewProps {
isFilterApplied: boolean;
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
}
function TracesView({
isFilterApplied,
setWarning,
setIsLoadingQueries,
}: TracesViewProps): JSX.Element {
const { stagedQuery, panelType } = useQueryBuilder();
const [orderBy, setOrderBy] = useState<string>('timestamp:desc');
@@ -117,6 +119,14 @@ function TracesView({
[responseData],
);
useEffect(() => {
if (isLoading || isFetching) {
setIsLoadingQueries(true);
} else {
setIsLoadingQueries(false);
}
}, [isLoading, isFetching, setIsLoadingQueries]);
useEffect(() => {
if (!isLoading && !isFetching && !isError && (tableData || []).length !== 0) {
logEvent('Traces Explorer: Data present', {

View File

@@ -6,7 +6,7 @@ import cx from 'classnames';
import { CardContainer } from 'container/GridCardLayout/styles';
import { useIsDarkMode } from 'hooks/useDarkMode';
import { ChevronDown, ChevronUp } from 'lucide-react';
import { useRef, useState } from 'react';
import { useCallback, useRef, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { Widgets } from 'types/api/dashboard/getAll';
@@ -129,23 +129,22 @@ function MetricPage(): JSX.Element {
},
];
const [renderedGraphCount, setRenderedGraphCount] = useState(0);
const renderedGraphCountRef = useRef(0);
const hasLoggedRef = useRef(false);
const checkIfDataExists = (isDataAvailable: boolean): void => {
const checkIfDataExists = useCallback((isDataAvailable: boolean): void => {
if (isDataAvailable) {
const newCount = renderedGraphCount + 1;
setRenderedGraphCount(newCount);
renderedGraphCountRef.current += 1;
// Only log when first graph has rendered and we haven't logged yet
if (newCount === 1 && !hasLoggedRef.current) {
if (renderedGraphCountRef.current === 1 && !hasLoggedRef.current) {
logEvent('MQ Kafka: Metric view', {
graphRendered: true,
});
hasLoggedRef.current = true;
}
}
};
}, []);
return (
<div className="metric-page">

View File

@@ -69,6 +69,7 @@ function TracesExplorer(): JSX.Element {
// Get panel type from URL
const panelTypesFromUrl = useGetPanelTypesQueryParam(PANEL_TYPES.LIST);
const [isLoadingQueries, setIsLoadingQueries] = useState<boolean>(false);
const [selectedView, setSelectedView] = useState<ExplorerViews>(() =>
getExplorerViewFromUrl(searchParams, panelTypesFromUrl),
@@ -323,6 +324,7 @@ function TracesExplorer(): JSX.Element {
rightActions={
<RightToolbarActions
onStageRunQuery={(): void => handleRunQuery(true, true)}
isLoadingQueries={isLoadingQueries}
/>
}
/>
@@ -344,13 +346,21 @@ function TracesExplorer(): JSX.Element {
{selectedView === ExplorerViews.LIST && (
<div className="trace-explorer-list-view">
<ListView isFilterApplied={isFilterApplied} setWarning={setWarning} />
<ListView
isFilterApplied={isFilterApplied}
setWarning={setWarning}
setIsLoadingQueries={setIsLoadingQueries}
/>
</div>
)}
{selectedView === ExplorerViews.TRACE && (
<div className="trace-explorer-traces-view">
<TracesView isFilterApplied={isFilterApplied} setWarning={setWarning} />
<TracesView
isFilterApplied={isFilterApplied}
setWarning={setWarning}
setIsLoadingQueries={setIsLoadingQueries}
/>
</div>
)}
@@ -360,13 +370,17 @@ function TracesExplorer(): JSX.Element {
dataSource={DataSource.TRACES}
isFilterApplied={isFilterApplied}
setWarning={setWarning}
setIsLoadingQueries={setIsLoadingQueries}
/>
</div>
)}
{selectedView === ExplorerViews.TABLE && (
<div className="trace-explorer-table-view">
<TableView setWarning={setWarning} />
<TableView
setWarning={setWarning}
setIsLoadingQueries={setIsLoadingQueries}
/>
</div>
)}
</div>

View File

@@ -385,7 +385,7 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
return resourceSubQuery, nil
}
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
if r.indexTable == "" {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
@@ -428,7 +428,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
query := fmt.Sprintf(
`SELECT
quantile(0.99)(duration_nano) as p99,
toFloat64(quantileExact(0.99)(duration_nano)) as p99,
avg(duration_nano) as avgDuration,
count(*) as numCalls
FROM %s.%s
@@ -510,6 +510,274 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
return &serviceItems, nil
}
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
if r.indexTable == "" {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
}
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
if apiErr != nil {
return nil, apiErr
}
// Build parallel arrays for arrayZip approach
var ops []string
var svcs []string
serviceOperationsMap := make(map[string][]string)
for svc, opsList := range *topLevelOps {
// Cap operations to 1500 per service (same as original logic)
cappedOps := opsList[:int(math.Min(1500, float64(len(opsList))))]
serviceOperationsMap[svc] = cappedOps
// Add to parallel arrays
for _, op := range cappedOps {
ops = append(ops, op)
svcs = append(svcs, svc)
}
}
fmt.Printf("Operation pairs count: %d\n", len(ops))
// Build resource subquery for all services, but only include our target services
targetServices := make([]string, 0, len(*topLevelOps))
for svc := range *topLevelOps {
targetServices = append(targetServices, svc)
}
resourceSubQuery, err := r.buildResourceSubQueryForServices(queryParams.Tags, targetServices, *queryParams.Start, *queryParams.End)
if err != nil {
zap.L().Error("Error building resource subquery", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
// Build the optimized single query using arrayZip for tuple creation
query := fmt.Sprintf(`
SELECT
resource_string_service$$name AS serviceName,
toFloat64(quantileExact(0.99)(duration_nano)) AS p99,
avg(duration_nano) AS avgDuration,
count(*) AS numCalls,
countIf(statusCode = 2) AS numErrors
FROM %s.%s
WHERE (name, resource_string_service$$name) IN arrayZip(@ops, @svcs)
AND timestamp >= @start
AND timestamp <= @end
AND ts_bucket_start >= @start_bucket
AND ts_bucket_start <= @end_bucket
AND (resource_fingerprint GLOBAL IN %s)
GROUP BY serviceName
ORDER BY numCalls DESC`,
r.TraceDB, r.traceTableName, resourceSubQuery,
)
args := []interface{}{
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
// Important: wrap slices with clickhouse.Array for IN/array params
clickhouse.Named("ops", ops),
clickhouse.Named("svcs", svcs),
}
fmt.Printf("Query: %s\n", query)
// Execute the single optimized query
rows, err := r.db.Query(ctx, query, args...)
if err != nil {
zap.L().Error("Error executing optimized services query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
defer rows.Close()
// Process results
serviceItems := []model.ServiceItem{}
for rows.Next() {
var serviceItem model.ServiceItem
err := rows.ScanStruct(&serviceItem)
if err != nil {
zap.L().Error("Error scanning service item", zap.Error(err))
continue
}
// Skip services with zero calls (match original behavior)
if serviceItem.NumCalls == 0 {
continue
}
// Add data warning for this service
if ops, exists := serviceOperationsMap[serviceItem.ServiceName]; exists {
serviceItem.DataWarning = model.DataWarning{
TopLevelOps: ops,
}
}
// Calculate derived fields
serviceItem.CallRate = float64(serviceItem.NumCalls) / float64(queryParams.Period)
if serviceItem.NumCalls > 0 {
serviceItem.ErrorRate = float64(serviceItem.NumErrors) * 100 / float64(serviceItem.NumCalls)
}
serviceItems = append(serviceItems, serviceItem)
}
if err = rows.Err(); err != nil {
zap.L().Error("Error iterating over service results", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
// Fetch results from the original GetServicesOG for comparison
ogResults, ogErr := r.GetServicesOG(ctx, queryParams)
if ogErr != nil {
zap.L().Error("Error fetching OG service results", zap.Error(ogErr))
} else {
// Compare the optimized results with OG results
ogMap := make(map[string]model.ServiceItem)
for _, ogItem := range *ogResults {
ogMap[ogItem.ServiceName] = ogItem
}
for _, optItem := range serviceItems {
if ogItem, exists := ogMap[optItem.ServiceName]; exists {
// Compare key fields (NumCalls, NumErrors, etc.)
if optItem.NumCalls != ogItem.NumCalls ||
optItem.NumErrors != ogItem.NumErrors ||
int64(optItem.Percentile99) != int64(ogItem.Percentile99) ||
int64(optItem.AvgDuration) != int64(ogItem.AvgDuration) {
fmt.Printf(
"[Discrepancy] Service: %s | optNumCalls: %d, ogNumCalls: %d | optNumErrors: %d, ogNumErrors: %d | optP99: %.2f, ogP99: %.2f | optAvgDuration: %.2f, ogAvgDuration: %.2f\n",
optItem.ServiceName,
optItem.NumCalls, ogItem.NumCalls,
optItem.NumErrors, ogItem.NumErrors,
optItem.Percentile99, ogItem.Percentile99,
optItem.AvgDuration, ogItem.AvgDuration,
)
}
} else {
zap.L().Warn("Service present in optimized results but missing in OG results",
zap.String("service", optItem.ServiceName))
}
}
// Check for services present in OG but missing in optimized
optMap := make(map[string]struct{})
for _, optItem := range serviceItems {
optMap[optItem.ServiceName] = struct{}{}
}
for _, ogItem := range *ogResults {
if _, exists := optMap[ogItem.ServiceName]; !exists {
fmt.Printf("Service present in OG results but missing in optimized results: %s\n", ogItem.ServiceName)
}
}
}
return &serviceItems, nil
}
// buildResourceSubQueryForServices builds a resource subquery that includes only specific services
// This maintains service context while optimizing for multiple services in a single query
func (r *ClickHouseReader) buildResourceSubQueryForServices(tags []model.TagQueryParam, targetServices []string, start, end time.Time) (string, error) {
if len(targetServices) == 0 {
return "", fmt.Errorf("no target services provided")
}
if len(tags) == 0 {
// For exact parity with per-service behavior, build via resource builder with only service filter
filterSet := v3.FilterSet{}
filterSet.Items = append(filterSet.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorIn,
Value: targetServices,
})
resourceSubQuery, err := resource.BuildResourceSubQuery(
r.TraceDB,
r.traceResourceTableV3,
start.Unix()-1800,
end.Unix(),
&filterSet,
[]v3.AttributeKey{},
v3.AttributeKey{},
false)
if err != nil {
zap.L().Error("Error building resource subquery for services", zap.Error(err))
return "", err
}
return resourceSubQuery, nil
}
// Convert tags to filter set
filterSet := v3.FilterSet{}
for _, tag := range tags {
// Skip the collector id as we don't add it to traces
if tag.Key == "signoz.collector.id" {
continue
}
var it v3.FilterItem
it.Key = v3.AttributeKey{
Key: tag.Key,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
}
switch tag.Operator {
case model.NotInOperator:
it.Operator = v3.FilterOperatorNotIn
it.Value = tag.StringValues
case model.InOperator:
it.Operator = v3.FilterOperatorIn
it.Value = tag.StringValues
default:
return "", fmt.Errorf("operator %s not supported", tag.Operator)
}
filterSet.Items = append(filterSet.Items, it)
}
// Add service filter to limit to our target services
filterSet.Items = append(filterSet.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorIn,
Value: targetServices,
})
// Build resource subquery with service-specific filtering
resourceSubQuery, err := resource.BuildResourceSubQuery(
r.TraceDB,
r.traceResourceTableV3,
start.Unix()-1800,
end.Unix(),
&filterSet,
[]v3.AttributeKey{},
v3.AttributeKey{},
false)
if err != nil {
zap.L().Error("Error building resource subquery for services", zap.Error(err))
return "", err
}
return resourceSubQuery, nil
}
// buildServiceInClause creates a properly quoted IN clause for service names
func (r *ClickHouseReader) buildServiceInClause(services []string) string {
var quotedServices []string
for _, svc := range services {
// Escape single quotes and wrap in quotes
escapedSvc := strings.ReplaceAll(svc, "'", "\\'")
quotedServices = append(quotedServices, fmt.Sprintf("'%s'", escapedSvc))
}
return strings.Join(quotedServices, ", ")
}
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
// status can only be two and if both are selected than they are equivalent to none selected
if _, ok := excludeMap["status"]; ok {
@@ -529,7 +797,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
}
return query
}
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
tags := []model.TagQuery{}
for _, tag := range queryParams {
@@ -686,7 +953,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
}
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
}
func (r *ClickHouseReader) GetEntryPointOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, error) {
// Step 1: Get top operations for the given service
topOps, err := r.GetTopOperations(ctx, queryParams)
@@ -755,9 +1021,9 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
query := fmt.Sprintf(`
SELECT
quantile(0.5)(durationNano) as p50,
quantile(0.95)(durationNano) as p95,
quantile(0.99)(durationNano) as p99,
toFloat64(quantileExact(0.5)(durationNano)) as p50,
toFloat64(quantileExact(0.95)(durationNano)) as p95,
toFloat64(quantileExact(0.99)(durationNano)) as p99,
COUNT(*) as numCalls,
countIf(status_code=2) as errorCount,
name
@@ -1239,11 +1505,11 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
SELECT
src as parent,
dest as child,
result[1] AS p50,
result[2] AS p75,
result[3] AS p90,
result[4] AS p95,
result[5] AS p99,
toFloat64(result[1]) AS p50,
toFloat64(result[2]) AS p75,
toFloat64(result[3]) AS p90,
toFloat64(result[4]) AS p95,
toFloat64(result[5]) AS p99,
sum(total_count) as callCount,
sum(total_count)/ @duration AS callRate,
sum(error_count)/sum(total_count) * 100 as errorRate
@@ -1275,7 +1541,6 @@ func getLocalTableName(tableName string) string {
return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1]
}
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
@@ -1416,7 +1681,6 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
}(ttlPayload)
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
@@ -2057,7 +2321,6 @@ func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.Li
return &getErrorResponses, nil
}
func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.CountErrorsParams) (uint64, *model.ApiError) {
var errorCount uint64
@@ -2169,7 +2432,6 @@ func (r *ClickHouseReader) GetNextPrevErrorIDs(ctx context.Context, queryParams
return &getNextPrevErrorIDsResponse, nil
}
func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {
var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse
@@ -2830,7 +3092,6 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
return &response, nil
}
func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
@@ -2905,7 +3166,6 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
return &attributeValues, nil
}
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.UUID, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
unixMilli := common.PastDayRoundOff()
@@ -3577,7 +3837,6 @@ func readRow(vars []interface{}, columnNames []string, countOfNumberCols int) ([
}
return groupBy, groupAttributes, groupAttributesArray, nil
}
func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string, countOfNumberCols int) ([]*v3.Series, error) {
// when groupBy is applied, each combination of cartesian product
// of attribute values is a separate series. Each item in seriesToPoints
@@ -4373,7 +4632,6 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
return timeline, nil
}
func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) {
query := fmt.Sprintf(`SELECT
@@ -4962,7 +5220,6 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context,
}
return timeSeries, nil
}
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.UUID, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
var args []interface{}
@@ -5180,7 +5437,6 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.
return &response, nil
}
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
var args []interface{}
@@ -5760,7 +6016,6 @@ func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_e
Series: &seriesList,
}, nil
}
func (r *ClickHouseReader) GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) {
// Build dynamic key selections and JSON extracts
var jsonExtracts []string
@@ -5933,7 +6188,6 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
}
return hasLE, nil
}
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
var missingMetrics []string

View File

@@ -136,7 +136,14 @@ func NewSQLMigrationProviderFactories(
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
return factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory(), telemetrystorehook.NewLoggingFactory()),
clickhousetelemetrystore.NewFactory(
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return telemetrystorehook.NewSettingsFactory(s)
}),
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return telemetrystorehook.NewLoggingFactory()
}),
),
)
}

View File

@@ -16,22 +16,13 @@ type provider struct {
hooks []telemetrystore.TelemetryStoreHook
}
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
func NewFactory(hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) {
// we want to fail fast so we have hook registration errors before creating the telemetry store
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
for i, hookFactory := range hookFactories {
hook, err := hookFactory.New(ctx, providerSettings, config)
if err != nil {
return nil, err
}
hooks[i] = hook
}
return New(ctx, providerSettings, config, hooks...)
return New(ctx, providerSettings, config, hookFactories...)
})
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) (telemetrystore.TelemetryStore, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
@@ -47,6 +38,20 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
return nil, err
}
var version string
if err := chConn.QueryRow(ctx, "SELECT version()").Scan(&version); err != nil {
return nil, err
}
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
for i, hookFactory := range hookFactories {
hook, err := hookFactory(version).New(ctx, providerSettings, config)
if err != nil {
return nil, err
}
hooks[i] = hook
}
return &provider{
settings: settings,
clickHouseConn: chConn,

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/factory"
)
type TelemetryStore interface {
@@ -19,6 +20,8 @@ type TelemetryStoreHook interface {
AfterQuery(ctx context.Context, event *QueryEvent)
}
type TelemetryStoreHookFactoryFunc func(string) factory.ProviderFactory[TelemetryStoreHook, Config]
func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, event *QueryEvent) context.Context {
for _, hook := range hooks {
ctx = hook.BeforeQuery(ctx, event)

View File

@@ -3,6 +3,7 @@ package telemetrystorehook
import (
"context"
"encoding/json"
"strings"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/factory"
@@ -11,16 +12,20 @@ import (
)
type provider struct {
settings telemetrystore.QuerySettings
clickHouseVersion string
settings telemetrystore.QuerySettings
}
func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings)
func NewSettingsFactory(version string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("settings"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
return NewSettings(ctx, providerSettings, config, version)
})
}
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, version string) (telemetrystore.TelemetryStoreHook, error) {
return &provider{
settings: config.Clickhouse.QuerySettings,
clickHouseVersion: version,
settings: config.Clickhouse.QuerySettings,
}, nil
}
@@ -75,7 +80,8 @@ func (h *provider) BeforeQuery(ctx context.Context, _ *telemetrystore.QueryEvent
settings["result_overflow_mode"] = ctx.Value("result_overflow_mode")
}
if !h.settings.SecondaryIndicesEnableBulkFiltering {
// ClickHouse version check is added since this setting is not support on version below 25.5
if strings.HasPrefix(h.clickHouseVersion, "25") && !h.settings.SecondaryIndicesEnableBulkFiltering {
// TODO(srikanthccv): enable it when the "Cannot read all data" issue is fixed
// https://github.com/ClickHouse/ClickHouse/issues/82283
settings["secondary_indices_enable_bulk_filtering"] = false