mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-08 02:39:55 +00:00
Compare commits
3 Commits
imp/remove
...
imp/remove
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1948cdfaa4 | ||
|
|
2fb7fe49ef | ||
|
|
c4762045a6 |
@@ -12,6 +12,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
@@ -384,6 +385,131 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
|
||||
return resourceSubQuery, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
}
|
||||
|
||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
|
||||
serviceItems := []model.ServiceItem{}
|
||||
var wg sync.WaitGroup
|
||||
// limit the number of concurrent queries to not overload the clickhouse server
|
||||
sem := make(chan struct{}, 10)
|
||||
var mtx sync.RWMutex
|
||||
|
||||
for svc, ops := range *topLevelOps {
|
||||
sem <- struct{}{}
|
||||
wg.Add(1)
|
||||
go func(svc string, ops []string) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
var serviceItem model.ServiceItem
|
||||
var numErrors uint64
|
||||
|
||||
// Even if the total number of operations within the time range is less and the all
|
||||
// the top level operations are high, we want to warn to let user know the issue
|
||||
// with the instrumentation
|
||||
serviceItem.DataWarning = model.DataWarning{
|
||||
TopLevelOps: (*topLevelOps)[svc],
|
||||
}
|
||||
|
||||
// default max_query_size = 262144
|
||||
// Let's assume the average size of the item in `ops` is 50 bytes
|
||||
// We can have 262144/50 = 5242 items in the `ops` array
|
||||
// Although we have make it as big as 5k, We cap the number of items
|
||||
// in the `ops` array to 1500
|
||||
|
||||
ops = ops[:int(math.Min(1500, float64(len(ops))))]
|
||||
|
||||
query := fmt.Sprintf(
|
||||
`SELECT
|
||||
toFloat64(quantileExact(0.99)(duration_nano)) as p99,
|
||||
avg(duration_nano) as avgDuration,
|
||||
count(*) as numCalls
|
||||
FROM %s.%s
|
||||
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
|
||||
r.TraceDB, r.traceTableName,
|
||||
)
|
||||
errorQuery := fmt.Sprintf(
|
||||
`SELECT
|
||||
count(*) as numErrors
|
||||
FROM %s.%s
|
||||
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
|
||||
r.TraceDB, r.traceTableName,
|
||||
)
|
||||
|
||||
args := []interface{}{}
|
||||
args = append(args,
|
||||
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
|
||||
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
|
||||
clickhouse.Named("serviceName", svc),
|
||||
clickhouse.Named("names", ops),
|
||||
)
|
||||
|
||||
resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return
|
||||
}
|
||||
query += `
|
||||
AND (
|
||||
resource_fingerprint GLOBAL IN ` +
|
||||
resourceSubQuery +
|
||||
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
|
||||
|
||||
args = append(args,
|
||||
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
|
||||
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
|
||||
)
|
||||
|
||||
err = r.db.QueryRow(
|
||||
ctx,
|
||||
query,
|
||||
args...,
|
||||
).ScanStruct(&serviceItem)
|
||||
|
||||
if serviceItem.NumCalls == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
errorQuery += `
|
||||
AND (
|
||||
resource_fingerprint GLOBAL IN ` +
|
||||
resourceSubQuery +
|
||||
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
|
||||
|
||||
err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
serviceItem.ServiceName = svc
|
||||
serviceItem.NumErrors = numErrors
|
||||
mtx.Lock()
|
||||
serviceItems = append(serviceItems, serviceItem)
|
||||
mtx.Unlock()
|
||||
}(svc, ops)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for idx := range serviceItems {
|
||||
serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period)
|
||||
serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls)
|
||||
}
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
@@ -427,7 +553,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
resource_string_service$$name AS serviceName,
|
||||
quantile(0.99)(duration_nano) AS p99,
|
||||
toFloat64(quantileExact(0.99)(duration_nano)) AS p99,
|
||||
avg(duration_nano) AS avgDuration,
|
||||
count(*) AS numCalls,
|
||||
countIf(statusCode = 2) AS numErrors
|
||||
@@ -500,6 +626,51 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
|
||||
// Fetch results from the original GetServicesOG for comparison
|
||||
ogResults, ogErr := r.GetServicesOG(ctx, queryParams)
|
||||
if ogErr != nil {
|
||||
zap.L().Error("Error fetching OG service results", zap.Error(ogErr))
|
||||
} else {
|
||||
// Compare the optimized results with OG results
|
||||
ogMap := make(map[string]model.ServiceItem)
|
||||
for _, ogItem := range *ogResults {
|
||||
ogMap[ogItem.ServiceName] = ogItem
|
||||
}
|
||||
|
||||
for _, optItem := range serviceItems {
|
||||
if ogItem, exists := ogMap[optItem.ServiceName]; exists {
|
||||
// Compare key fields (NumCalls, NumErrors, etc.)
|
||||
if optItem.NumCalls != ogItem.NumCalls ||
|
||||
optItem.NumErrors != ogItem.NumErrors ||
|
||||
int64(optItem.Percentile99) != int64(ogItem.Percentile99) ||
|
||||
int64(optItem.AvgDuration) != int64(ogItem.AvgDuration) {
|
||||
fmt.Printf(
|
||||
"[Discrepancy] Service: %s | optNumCalls: %d, ogNumCalls: %d | optNumErrors: %d, ogNumErrors: %d | optP99: %.2f, ogP99: %.2f | optAvgDuration: %.2f, ogAvgDuration: %.2f\n",
|
||||
optItem.ServiceName,
|
||||
optItem.NumCalls, ogItem.NumCalls,
|
||||
optItem.NumErrors, ogItem.NumErrors,
|
||||
optItem.Percentile99, ogItem.Percentile99,
|
||||
optItem.AvgDuration, ogItem.AvgDuration,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
zap.L().Warn("Service present in optimized results but missing in OG results",
|
||||
zap.String("service", optItem.ServiceName))
|
||||
}
|
||||
}
|
||||
|
||||
// Check for services present in OG but missing in optimized
|
||||
optMap := make(map[string]struct{})
|
||||
for _, optItem := range serviceItems {
|
||||
optMap[optItem.ServiceName] = struct{}{}
|
||||
}
|
||||
for _, ogItem := range *ogResults {
|
||||
if _, exists := optMap[ogItem.ServiceName]; !exists {
|
||||
fmt.Printf("Service present in OG results but missing in optimized results: %s\n", ogItem.ServiceName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
@@ -626,7 +797,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
||||
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
|
||||
tags := []model.TagQuery{}
|
||||
for _, tag := range queryParams {
|
||||
@@ -851,9 +1021,9 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
quantile(0.5)(durationNano) as p50,
|
||||
quantile(0.95)(durationNano) as p95,
|
||||
quantile(0.99)(durationNano) as p99,
|
||||
toFloat64(quantileExact(0.5)(durationNano)) as p50,
|
||||
toFloat64(quantileExact(0.95)(durationNano)) as p95,
|
||||
toFloat64(quantileExact(0.99)(durationNano)) as p99,
|
||||
COUNT(*) as numCalls,
|
||||
countIf(status_code=2) as errorCount,
|
||||
name
|
||||
@@ -1335,11 +1505,11 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
|
||||
SELECT
|
||||
src as parent,
|
||||
dest as child,
|
||||
result[1] AS p50,
|
||||
result[2] AS p75,
|
||||
result[3] AS p90,
|
||||
result[4] AS p95,
|
||||
result[5] AS p99,
|
||||
toFloat64(result[1]) AS p50,
|
||||
toFloat64(result[2]) AS p75,
|
||||
toFloat64(result[3]) AS p90,
|
||||
toFloat64(result[4]) AS p95,
|
||||
toFloat64(result[5]) AS p99,
|
||||
sum(total_count) as callCount,
|
||||
sum(total_count)/ @duration AS callRate,
|
||||
sum(error_count)/sum(total_count) * 100 as errorRate
|
||||
@@ -1371,7 +1541,6 @@ func getLocalTableName(tableName string) string {
|
||||
return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1]
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
// uuid is used as transaction id
|
||||
uuidWithHyphen := uuid.New()
|
||||
@@ -2152,7 +2321,6 @@ func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.Li
|
||||
|
||||
return &getErrorResponses, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.CountErrorsParams) (uint64, *model.ApiError) {
|
||||
|
||||
var errorCount uint64
|
||||
@@ -2924,7 +3092,6 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
||||
var query string
|
||||
var err error
|
||||
@@ -3670,7 +3837,6 @@ func readRow(vars []interface{}, columnNames []string, countOfNumberCols int) ([
|
||||
}
|
||||
return groupBy, groupAttributes, groupAttributesArray, nil
|
||||
}
|
||||
|
||||
func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string, countOfNumberCols int) ([]*v3.Series, error) {
|
||||
// when groupBy is applied, each combination of cartesian product
|
||||
// of attribute values is a separate series. Each item in seriesToPoints
|
||||
@@ -4466,7 +4632,6 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
|
||||
|
||||
return timeline, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
|
||||
ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) {
|
||||
query := fmt.Sprintf(`SELECT
|
||||
@@ -5055,7 +5220,6 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context,
|
||||
}
|
||||
return timeSeries, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.UUID, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
@@ -5852,7 +6016,6 @@ func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_e
|
||||
Series: &seriesList,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) {
|
||||
// Build dynamic key selections and JSON extracts
|
||||
var jsonExtracts []string
|
||||
|
||||
Reference in New Issue
Block a user