mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-13 12:52:55 +00:00
Compare commits
20 Commits
v0.92.0
...
imp/remove
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1948cdfaa4 | ||
|
|
2fb7fe49ef | ||
|
|
c4762045a6 | ||
|
|
1541734542 | ||
|
|
46e5b407f7 | ||
|
|
f2c3946101 | ||
|
|
4dca46de40 | ||
|
|
6f420abe27 | ||
|
|
1d9b457af6 | ||
|
|
d437998750 | ||
|
|
e02d0cdd98 | ||
|
|
1ad4a6699a | ||
|
|
00ae45022b | ||
|
|
6f4a965c6d | ||
|
|
4c29b03577 | ||
|
|
ea1409bc4f | ||
|
|
0e3ac2a179 | ||
|
|
249f8be845 | ||
|
|
9c952942ad | ||
|
|
dac46d82ff |
@@ -174,7 +174,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.92.0
|
||||
image: signoz/signoz:v0.92.1
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
ports:
|
||||
|
||||
@@ -115,7 +115,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.92.0
|
||||
image: signoz/signoz:v0.92.1
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
ports:
|
||||
|
||||
@@ -177,7 +177,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.92.0}
|
||||
image: signoz/signoz:${VERSION:-v0.92.1}
|
||||
container_name: signoz
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
|
||||
@@ -110,7 +110,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.92.0}
|
||||
image: signoz/signoz:${VERSION:-v0.92.1}
|
||||
container_name: signoz
|
||||
command:
|
||||
- --config=/root/config/prometheus.yml
|
||||
|
||||
@@ -20,6 +20,7 @@ function TimeSeriesViewContainer({
|
||||
dataSource = DataSource.TRACES,
|
||||
isFilterApplied,
|
||||
setWarning,
|
||||
setIsLoadingQueries,
|
||||
}: TimeSeriesViewProps): JSX.Element {
|
||||
const { stagedQuery, currentQuery, panelType } = useQueryBuilder();
|
||||
|
||||
@@ -83,6 +84,14 @@ function TimeSeriesViewContainer({
|
||||
[data, isValidToConvertToMs],
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || isFetching) {
|
||||
setIsLoadingQueries(true);
|
||||
} else {
|
||||
setIsLoadingQueries(false);
|
||||
}
|
||||
}, [isLoading, isFetching, setIsLoadingQueries]);
|
||||
|
||||
return (
|
||||
<TimeSeriesView
|
||||
isFilterApplied={isFilterApplied}
|
||||
@@ -101,6 +110,7 @@ interface TimeSeriesViewProps {
|
||||
dataSource?: DataSource;
|
||||
isFilterApplied: boolean;
|
||||
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
|
||||
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
|
||||
}
|
||||
|
||||
TimeSeriesViewContainer.defaultProps = {
|
||||
|
||||
@@ -49,9 +49,14 @@ import { getListColumns, transformDataWithDate } from './utils';
|
||||
interface ListViewProps {
|
||||
isFilterApplied: boolean;
|
||||
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
|
||||
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
|
||||
}
|
||||
|
||||
function ListView({ isFilterApplied, setWarning }: ListViewProps): JSX.Element {
|
||||
function ListView({
|
||||
isFilterApplied,
|
||||
setWarning,
|
||||
setIsLoadingQueries,
|
||||
}: ListViewProps): JSX.Element {
|
||||
const {
|
||||
stagedQuery,
|
||||
panelType: panelTypeFromQueryBuilder,
|
||||
@@ -162,6 +167,14 @@ function ListView({ isFilterApplied, setWarning }: ListViewProps): JSX.Element {
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [data?.payload, data?.warning]);
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || isFetching) {
|
||||
setIsLoadingQueries(true);
|
||||
} else {
|
||||
setIsLoadingQueries(false);
|
||||
}
|
||||
}, [isLoading, isFetching, setIsLoadingQueries]);
|
||||
|
||||
const dataLength =
|
||||
data?.payload?.data?.newResult?.data?.result[0]?.list?.length;
|
||||
const totalCount = useMemo(() => dataLength || 0, [dataLength]);
|
||||
|
||||
@@ -16,8 +16,10 @@ import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
|
||||
function TableView({
|
||||
setWarning,
|
||||
setIsLoadingQueries,
|
||||
}: {
|
||||
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
|
||||
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
|
||||
}): JSX.Element {
|
||||
const { stagedQuery, panelType } = useQueryBuilder();
|
||||
|
||||
@@ -26,7 +28,7 @@ function TableView({
|
||||
GlobalReducer
|
||||
>((state) => state.globalTime);
|
||||
|
||||
const { data, isLoading, isError, error } = useGetQueryRange(
|
||||
const { data, isLoading, isFetching, isError, error } = useGetQueryRange(
|
||||
{
|
||||
query: stagedQuery || initialQueriesMap.traces,
|
||||
graphType: panelType || PANEL_TYPES.TABLE,
|
||||
@@ -49,6 +51,14 @@ function TableView({
|
||||
},
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || isFetching) {
|
||||
setIsLoadingQueries(true);
|
||||
} else {
|
||||
setIsLoadingQueries(false);
|
||||
}
|
||||
}, [isLoading, isFetching, setIsLoadingQueries]);
|
||||
|
||||
const queryTableData = useMemo(
|
||||
() =>
|
||||
data?.payload?.data?.newResult?.data?.result ||
|
||||
|
||||
@@ -40,11 +40,13 @@ import { ActionsContainer, Container } from './styles';
|
||||
interface TracesViewProps {
|
||||
isFilterApplied: boolean;
|
||||
setWarning: Dispatch<SetStateAction<Warning | undefined>>;
|
||||
setIsLoadingQueries: Dispatch<SetStateAction<boolean>>;
|
||||
}
|
||||
|
||||
function TracesView({
|
||||
isFilterApplied,
|
||||
setWarning,
|
||||
setIsLoadingQueries,
|
||||
}: TracesViewProps): JSX.Element {
|
||||
const { stagedQuery, panelType } = useQueryBuilder();
|
||||
const [orderBy, setOrderBy] = useState<string>('timestamp:desc');
|
||||
@@ -117,6 +119,14 @@ function TracesView({
|
||||
[responseData],
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoading || isFetching) {
|
||||
setIsLoadingQueries(true);
|
||||
} else {
|
||||
setIsLoadingQueries(false);
|
||||
}
|
||||
}, [isLoading, isFetching, setIsLoadingQueries]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!isLoading && !isFetching && !isError && (tableData || []).length !== 0) {
|
||||
logEvent('Traces Explorer: Data present', {
|
||||
|
||||
@@ -6,7 +6,7 @@ import cx from 'classnames';
|
||||
import { CardContainer } from 'container/GridCardLayout/styles';
|
||||
import { useIsDarkMode } from 'hooks/useDarkMode';
|
||||
import { ChevronDown, ChevronUp } from 'lucide-react';
|
||||
import { useRef, useState } from 'react';
|
||||
import { useCallback, useRef, useState } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { Widgets } from 'types/api/dashboard/getAll';
|
||||
|
||||
@@ -129,23 +129,22 @@ function MetricPage(): JSX.Element {
|
||||
},
|
||||
];
|
||||
|
||||
const [renderedGraphCount, setRenderedGraphCount] = useState(0);
|
||||
const renderedGraphCountRef = useRef(0);
|
||||
const hasLoggedRef = useRef(false);
|
||||
|
||||
const checkIfDataExists = (isDataAvailable: boolean): void => {
|
||||
const checkIfDataExists = useCallback((isDataAvailable: boolean): void => {
|
||||
if (isDataAvailable) {
|
||||
const newCount = renderedGraphCount + 1;
|
||||
setRenderedGraphCount(newCount);
|
||||
renderedGraphCountRef.current += 1;
|
||||
|
||||
// Only log when first graph has rendered and we haven't logged yet
|
||||
if (newCount === 1 && !hasLoggedRef.current) {
|
||||
if (renderedGraphCountRef.current === 1 && !hasLoggedRef.current) {
|
||||
logEvent('MQ Kafka: Metric view', {
|
||||
graphRendered: true,
|
||||
});
|
||||
hasLoggedRef.current = true;
|
||||
}
|
||||
}
|
||||
};
|
||||
}, []);
|
||||
|
||||
return (
|
||||
<div className="metric-page">
|
||||
|
||||
@@ -69,6 +69,7 @@ function TracesExplorer(): JSX.Element {
|
||||
|
||||
// Get panel type from URL
|
||||
const panelTypesFromUrl = useGetPanelTypesQueryParam(PANEL_TYPES.LIST);
|
||||
const [isLoadingQueries, setIsLoadingQueries] = useState<boolean>(false);
|
||||
|
||||
const [selectedView, setSelectedView] = useState<ExplorerViews>(() =>
|
||||
getExplorerViewFromUrl(searchParams, panelTypesFromUrl),
|
||||
@@ -323,6 +324,7 @@ function TracesExplorer(): JSX.Element {
|
||||
rightActions={
|
||||
<RightToolbarActions
|
||||
onStageRunQuery={(): void => handleRunQuery(true, true)}
|
||||
isLoadingQueries={isLoadingQueries}
|
||||
/>
|
||||
}
|
||||
/>
|
||||
@@ -344,13 +346,21 @@ function TracesExplorer(): JSX.Element {
|
||||
|
||||
{selectedView === ExplorerViews.LIST && (
|
||||
<div className="trace-explorer-list-view">
|
||||
<ListView isFilterApplied={isFilterApplied} setWarning={setWarning} />
|
||||
<ListView
|
||||
isFilterApplied={isFilterApplied}
|
||||
setWarning={setWarning}
|
||||
setIsLoadingQueries={setIsLoadingQueries}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{selectedView === ExplorerViews.TRACE && (
|
||||
<div className="trace-explorer-traces-view">
|
||||
<TracesView isFilterApplied={isFilterApplied} setWarning={setWarning} />
|
||||
<TracesView
|
||||
isFilterApplied={isFilterApplied}
|
||||
setWarning={setWarning}
|
||||
setIsLoadingQueries={setIsLoadingQueries}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
@@ -360,13 +370,17 @@ function TracesExplorer(): JSX.Element {
|
||||
dataSource={DataSource.TRACES}
|
||||
isFilterApplied={isFilterApplied}
|
||||
setWarning={setWarning}
|
||||
setIsLoadingQueries={setIsLoadingQueries}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{selectedView === ExplorerViews.TABLE && (
|
||||
<div className="trace-explorer-table-view">
|
||||
<TableView setWarning={setWarning} />
|
||||
<TableView
|
||||
setWarning={setWarning}
|
||||
setIsLoadingQueries={setIsLoadingQueries}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
@@ -385,7 +385,7 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
|
||||
return resourceSubQuery, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
@@ -428,7 +428,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
||||
|
||||
query := fmt.Sprintf(
|
||||
`SELECT
|
||||
quantile(0.99)(duration_nano) as p99,
|
||||
toFloat64(quantileExact(0.99)(duration_nano)) as p99,
|
||||
avg(duration_nano) as avgDuration,
|
||||
count(*) as numCalls
|
||||
FROM %s.%s
|
||||
@@ -510,6 +510,274 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
}
|
||||
|
||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
// Build parallel arrays for arrayZip approach
|
||||
var ops []string
|
||||
var svcs []string
|
||||
serviceOperationsMap := make(map[string][]string)
|
||||
|
||||
for svc, opsList := range *topLevelOps {
|
||||
// Cap operations to 1500 per service (same as original logic)
|
||||
cappedOps := opsList[:int(math.Min(1500, float64(len(opsList))))]
|
||||
serviceOperationsMap[svc] = cappedOps
|
||||
|
||||
// Add to parallel arrays
|
||||
for _, op := range cappedOps {
|
||||
ops = append(ops, op)
|
||||
svcs = append(svcs, svc)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("Operation pairs count: %d\n", len(ops))
|
||||
|
||||
// Build resource subquery for all services, but only include our target services
|
||||
targetServices := make([]string, 0, len(*topLevelOps))
|
||||
for svc := range *topLevelOps {
|
||||
targetServices = append(targetServices, svc)
|
||||
}
|
||||
resourceSubQuery, err := r.buildResourceSubQueryForServices(queryParams.Tags, targetServices, *queryParams.Start, *queryParams.End)
|
||||
if err != nil {
|
||||
zap.L().Error("Error building resource subquery", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
|
||||
// Build the optimized single query using arrayZip for tuple creation
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
resource_string_service$$name AS serviceName,
|
||||
toFloat64(quantileExact(0.99)(duration_nano)) AS p99,
|
||||
avg(duration_nano) AS avgDuration,
|
||||
count(*) AS numCalls,
|
||||
countIf(statusCode = 2) AS numErrors
|
||||
FROM %s.%s
|
||||
WHERE (name, resource_string_service$$name) IN arrayZip(@ops, @svcs)
|
||||
AND timestamp >= @start
|
||||
AND timestamp <= @end
|
||||
AND ts_bucket_start >= @start_bucket
|
||||
AND ts_bucket_start <= @end_bucket
|
||||
AND (resource_fingerprint GLOBAL IN %s)
|
||||
GROUP BY serviceName
|
||||
ORDER BY numCalls DESC`,
|
||||
r.TraceDB, r.traceTableName, resourceSubQuery,
|
||||
)
|
||||
|
||||
args := []interface{}{
|
||||
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
|
||||
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
|
||||
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
|
||||
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
|
||||
// Important: wrap slices with clickhouse.Array for IN/array params
|
||||
clickhouse.Named("ops", ops),
|
||||
clickhouse.Named("svcs", svcs),
|
||||
}
|
||||
|
||||
fmt.Printf("Query: %s\n", query)
|
||||
|
||||
// Execute the single optimized query
|
||||
rows, err := r.db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
zap.L().Error("Error executing optimized services query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Process results
|
||||
serviceItems := []model.ServiceItem{}
|
||||
|
||||
for rows.Next() {
|
||||
var serviceItem model.ServiceItem
|
||||
err := rows.ScanStruct(&serviceItem)
|
||||
if err != nil {
|
||||
zap.L().Error("Error scanning service item", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip services with zero calls (match original behavior)
|
||||
if serviceItem.NumCalls == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Add data warning for this service
|
||||
if ops, exists := serviceOperationsMap[serviceItem.ServiceName]; exists {
|
||||
serviceItem.DataWarning = model.DataWarning{
|
||||
TopLevelOps: ops,
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate derived fields
|
||||
serviceItem.CallRate = float64(serviceItem.NumCalls) / float64(queryParams.Period)
|
||||
if serviceItem.NumCalls > 0 {
|
||||
serviceItem.ErrorRate = float64(serviceItem.NumErrors) * 100 / float64(serviceItem.NumCalls)
|
||||
}
|
||||
|
||||
serviceItems = append(serviceItems, serviceItem)
|
||||
}
|
||||
|
||||
if err = rows.Err(); err != nil {
|
||||
zap.L().Error("Error iterating over service results", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
|
||||
// Fetch results from the original GetServicesOG for comparison
|
||||
ogResults, ogErr := r.GetServicesOG(ctx, queryParams)
|
||||
if ogErr != nil {
|
||||
zap.L().Error("Error fetching OG service results", zap.Error(ogErr))
|
||||
} else {
|
||||
// Compare the optimized results with OG results
|
||||
ogMap := make(map[string]model.ServiceItem)
|
||||
for _, ogItem := range *ogResults {
|
||||
ogMap[ogItem.ServiceName] = ogItem
|
||||
}
|
||||
|
||||
for _, optItem := range serviceItems {
|
||||
if ogItem, exists := ogMap[optItem.ServiceName]; exists {
|
||||
// Compare key fields (NumCalls, NumErrors, etc.)
|
||||
if optItem.NumCalls != ogItem.NumCalls ||
|
||||
optItem.NumErrors != ogItem.NumErrors ||
|
||||
int64(optItem.Percentile99) != int64(ogItem.Percentile99) ||
|
||||
int64(optItem.AvgDuration) != int64(ogItem.AvgDuration) {
|
||||
fmt.Printf(
|
||||
"[Discrepancy] Service: %s | optNumCalls: %d, ogNumCalls: %d | optNumErrors: %d, ogNumErrors: %d | optP99: %.2f, ogP99: %.2f | optAvgDuration: %.2f, ogAvgDuration: %.2f\n",
|
||||
optItem.ServiceName,
|
||||
optItem.NumCalls, ogItem.NumCalls,
|
||||
optItem.NumErrors, ogItem.NumErrors,
|
||||
optItem.Percentile99, ogItem.Percentile99,
|
||||
optItem.AvgDuration, ogItem.AvgDuration,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
zap.L().Warn("Service present in optimized results but missing in OG results",
|
||||
zap.String("service", optItem.ServiceName))
|
||||
}
|
||||
}
|
||||
|
||||
// Check for services present in OG but missing in optimized
|
||||
optMap := make(map[string]struct{})
|
||||
for _, optItem := range serviceItems {
|
||||
optMap[optItem.ServiceName] = struct{}{}
|
||||
}
|
||||
for _, ogItem := range *ogResults {
|
||||
if _, exists := optMap[ogItem.ServiceName]; !exists {
|
||||
fmt.Printf("Service present in OG results but missing in optimized results: %s\n", ogItem.ServiceName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
// buildResourceSubQueryForServices builds a resource subquery that includes only specific services
|
||||
// This maintains service context while optimizing for multiple services in a single query
|
||||
func (r *ClickHouseReader) buildResourceSubQueryForServices(tags []model.TagQueryParam, targetServices []string, start, end time.Time) (string, error) {
|
||||
if len(targetServices) == 0 {
|
||||
return "", fmt.Errorf("no target services provided")
|
||||
}
|
||||
|
||||
if len(tags) == 0 {
|
||||
// For exact parity with per-service behavior, build via resource builder with only service filter
|
||||
filterSet := v3.FilterSet{}
|
||||
filterSet.Items = append(filterSet.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "service.name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeResource,
|
||||
},
|
||||
Operator: v3.FilterOperatorIn,
|
||||
Value: targetServices,
|
||||
})
|
||||
|
||||
resourceSubQuery, err := resource.BuildResourceSubQuery(
|
||||
r.TraceDB,
|
||||
r.traceResourceTableV3,
|
||||
start.Unix()-1800,
|
||||
end.Unix(),
|
||||
&filterSet,
|
||||
[]v3.AttributeKey{},
|
||||
v3.AttributeKey{},
|
||||
false)
|
||||
if err != nil {
|
||||
zap.L().Error("Error building resource subquery for services", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
return resourceSubQuery, nil
|
||||
}
|
||||
|
||||
// Convert tags to filter set
|
||||
filterSet := v3.FilterSet{}
|
||||
for _, tag := range tags {
|
||||
// Skip the collector id as we don't add it to traces
|
||||
if tag.Key == "signoz.collector.id" {
|
||||
continue
|
||||
}
|
||||
|
||||
var it v3.FilterItem
|
||||
it.Key = v3.AttributeKey{
|
||||
Key: tag.Key,
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeResource,
|
||||
}
|
||||
|
||||
switch tag.Operator {
|
||||
case model.NotInOperator:
|
||||
it.Operator = v3.FilterOperatorNotIn
|
||||
it.Value = tag.StringValues
|
||||
case model.InOperator:
|
||||
it.Operator = v3.FilterOperatorIn
|
||||
it.Value = tag.StringValues
|
||||
default:
|
||||
return "", fmt.Errorf("operator %s not supported", tag.Operator)
|
||||
}
|
||||
|
||||
filterSet.Items = append(filterSet.Items, it)
|
||||
}
|
||||
|
||||
// Add service filter to limit to our target services
|
||||
filterSet.Items = append(filterSet.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "service.name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeResource,
|
||||
},
|
||||
Operator: v3.FilterOperatorIn,
|
||||
Value: targetServices,
|
||||
})
|
||||
|
||||
// Build resource subquery with service-specific filtering
|
||||
resourceSubQuery, err := resource.BuildResourceSubQuery(
|
||||
r.TraceDB,
|
||||
r.traceResourceTableV3,
|
||||
start.Unix()-1800,
|
||||
end.Unix(),
|
||||
&filterSet,
|
||||
[]v3.AttributeKey{},
|
||||
v3.AttributeKey{},
|
||||
false)
|
||||
if err != nil {
|
||||
zap.L().Error("Error building resource subquery for services", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
return resourceSubQuery, nil
|
||||
}
|
||||
|
||||
// buildServiceInClause creates a properly quoted IN clause for service names
|
||||
func (r *ClickHouseReader) buildServiceInClause(services []string) string {
|
||||
var quotedServices []string
|
||||
for _, svc := range services {
|
||||
// Escape single quotes and wrap in quotes
|
||||
escapedSvc := strings.ReplaceAll(svc, "'", "\\'")
|
||||
quotedServices = append(quotedServices, fmt.Sprintf("'%s'", escapedSvc))
|
||||
}
|
||||
return strings.Join(quotedServices, ", ")
|
||||
}
|
||||
|
||||
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
|
||||
// status can only be two and if both are selected than they are equivalent to none selected
|
||||
if _, ok := excludeMap["status"]; ok {
|
||||
@@ -529,7 +797,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
||||
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
|
||||
tags := []model.TagQuery{}
|
||||
for _, tag := range queryParams {
|
||||
@@ -686,7 +953,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
|
||||
}
|
||||
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetEntryPointOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, error) {
|
||||
// Step 1: Get top operations for the given service
|
||||
topOps, err := r.GetTopOperations(ctx, queryParams)
|
||||
@@ -755,9 +1021,9 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
quantile(0.5)(durationNano) as p50,
|
||||
quantile(0.95)(durationNano) as p95,
|
||||
quantile(0.99)(durationNano) as p99,
|
||||
toFloat64(quantileExact(0.5)(durationNano)) as p50,
|
||||
toFloat64(quantileExact(0.95)(durationNano)) as p95,
|
||||
toFloat64(quantileExact(0.99)(durationNano)) as p99,
|
||||
COUNT(*) as numCalls,
|
||||
countIf(status_code=2) as errorCount,
|
||||
name
|
||||
@@ -1239,11 +1505,11 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
|
||||
SELECT
|
||||
src as parent,
|
||||
dest as child,
|
||||
result[1] AS p50,
|
||||
result[2] AS p75,
|
||||
result[3] AS p90,
|
||||
result[4] AS p95,
|
||||
result[5] AS p99,
|
||||
toFloat64(result[1]) AS p50,
|
||||
toFloat64(result[2]) AS p75,
|
||||
toFloat64(result[3]) AS p90,
|
||||
toFloat64(result[4]) AS p95,
|
||||
toFloat64(result[5]) AS p99,
|
||||
sum(total_count) as callCount,
|
||||
sum(total_count)/ @duration AS callRate,
|
||||
sum(error_count)/sum(total_count) * 100 as errorRate
|
||||
@@ -1275,7 +1541,6 @@ func getLocalTableName(tableName string) string {
|
||||
return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1]
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
// uuid is used as transaction id
|
||||
uuidWithHyphen := uuid.New()
|
||||
@@ -1416,7 +1681,6 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
}(ttlPayload)
|
||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
// uuid is used as transaction id
|
||||
uuidWithHyphen := uuid.New()
|
||||
@@ -2057,7 +2321,6 @@ func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.Li
|
||||
|
||||
return &getErrorResponses, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.CountErrorsParams) (uint64, *model.ApiError) {
|
||||
|
||||
var errorCount uint64
|
||||
@@ -2169,7 +2432,6 @@ func (r *ClickHouseReader) GetNextPrevErrorIDs(ctx context.Context, queryParams
|
||||
return &getNextPrevErrorIDsResponse, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {
|
||||
|
||||
var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse
|
||||
@@ -2830,7 +3092,6 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
||||
var query string
|
||||
var err error
|
||||
@@ -2905,7 +3166,6 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
|
||||
|
||||
return &attributeValues, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.UUID, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
|
||||
|
||||
unixMilli := common.PastDayRoundOff()
|
||||
@@ -3577,7 +3837,6 @@ func readRow(vars []interface{}, columnNames []string, countOfNumberCols int) ([
|
||||
}
|
||||
return groupBy, groupAttributes, groupAttributesArray, nil
|
||||
}
|
||||
|
||||
func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string, countOfNumberCols int) ([]*v3.Series, error) {
|
||||
// when groupBy is applied, each combination of cartesian product
|
||||
// of attribute values is a separate series. Each item in seriesToPoints
|
||||
@@ -4373,7 +4632,6 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
|
||||
|
||||
return timeline, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
|
||||
ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) {
|
||||
query := fmt.Sprintf(`SELECT
|
||||
@@ -4962,7 +5220,6 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context,
|
||||
}
|
||||
return timeSeries, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.UUID, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
@@ -5180,7 +5437,6 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
@@ -5760,7 +6016,6 @@ func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_e
|
||||
Series: &seriesList,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) {
|
||||
// Build dynamic key selections and JSON extracts
|
||||
var jsonExtracts []string
|
||||
@@ -5933,7 +6188,6 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
|
||||
}
|
||||
return hasLE, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
|
||||
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
|
||||
var missingMetrics []string
|
||||
|
||||
@@ -136,7 +136,14 @@ func NewSQLMigrationProviderFactories(
|
||||
|
||||
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory(), telemetrystorehook.NewLoggingFactory()),
|
||||
clickhousetelemetrystore.NewFactory(
|
||||
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return telemetrystorehook.NewSettingsFactory(s)
|
||||
}),
|
||||
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return telemetrystorehook.NewLoggingFactory()
|
||||
}),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -16,22 +16,13 @@ type provider struct {
|
||||
hooks []telemetrystore.TelemetryStoreHook
|
||||
}
|
||||
|
||||
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
|
||||
func NewFactory(hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) {
|
||||
// we want to fail fast so we have hook registration errors before creating the telemetry store
|
||||
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
|
||||
for i, hookFactory := range hookFactories {
|
||||
hook, err := hookFactory.New(ctx, providerSettings, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hooks[i] = hook
|
||||
}
|
||||
return New(ctx, providerSettings, config, hooks...)
|
||||
return New(ctx, providerSettings, config, hookFactories...)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) (telemetrystore.TelemetryStore, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
|
||||
|
||||
options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
|
||||
@@ -47,6 +38,20 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var version string
|
||||
if err := chConn.QueryRow(ctx, "SELECT version()").Scan(&version); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
|
||||
for i, hookFactory := range hookFactories {
|
||||
hook, err := hookFactory(version).New(ctx, providerSettings, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hooks[i] = hook
|
||||
}
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
clickHouseConn: chConn,
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type TelemetryStore interface {
|
||||
@@ -19,6 +20,8 @@ type TelemetryStoreHook interface {
|
||||
AfterQuery(ctx context.Context, event *QueryEvent)
|
||||
}
|
||||
|
||||
type TelemetryStoreHookFactoryFunc func(string) factory.ProviderFactory[TelemetryStoreHook, Config]
|
||||
|
||||
func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, event *QueryEvent) context.Context {
|
||||
for _, hook := range hooks {
|
||||
ctx = hook.BeforeQuery(ctx, event)
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetrystorehook
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
@@ -11,16 +12,20 @@ import (
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
settings telemetrystore.QuerySettings
|
||||
clickHouseVersion string
|
||||
settings telemetrystore.QuerySettings
|
||||
}
|
||||
|
||||
func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings)
|
||||
func NewSettingsFactory(version string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("settings"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
||||
return NewSettings(ctx, providerSettings, config, version)
|
||||
})
|
||||
}
|
||||
|
||||
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
||||
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, version string) (telemetrystore.TelemetryStoreHook, error) {
|
||||
return &provider{
|
||||
settings: config.Clickhouse.QuerySettings,
|
||||
clickHouseVersion: version,
|
||||
settings: config.Clickhouse.QuerySettings,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -75,7 +80,8 @@ func (h *provider) BeforeQuery(ctx context.Context, _ *telemetrystore.QueryEvent
|
||||
settings["result_overflow_mode"] = ctx.Value("result_overflow_mode")
|
||||
}
|
||||
|
||||
if !h.settings.SecondaryIndicesEnableBulkFiltering {
|
||||
// ClickHouse version check is added since this setting is not support on version below 25.5
|
||||
if strings.HasPrefix(h.clickHouseVersion, "25") && !h.settings.SecondaryIndicesEnableBulkFiltering {
|
||||
// TODO(srikanthccv): enable it when the "Cannot read all data" issue is fixed
|
||||
// https://github.com/ClickHouse/ClickHouse/issues/82283
|
||||
settings["secondary_indices_enable_bulk_filtering"] = false
|
||||
|
||||
Reference in New Issue
Block a user