mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-14 09:02:15 +00:00
Compare commits
13 Commits
debug_time
...
imp/remove
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1541734542 | ||
|
|
46e5b407f7 | ||
|
|
f2c3946101 | ||
|
|
4dca46de40 | ||
|
|
6f420abe27 | ||
|
|
1d9b457af6 | ||
|
|
d437998750 | ||
|
|
e02d0cdd98 | ||
|
|
1ad4a6699a | ||
|
|
00ae45022b | ||
|
|
6f4a965c6d | ||
|
|
4c29b03577 | ||
|
|
ea1409bc4f |
@@ -12,7 +12,6 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||||
@@ -386,7 +385,6 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||||
|
|
||||||
if r.indexTable == "" {
|
if r.indexTable == "" {
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||||
}
|
}
|
||||||
@@ -395,121 +393,220 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
|||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
return nil, apiErr
|
return nil, apiErr
|
||||||
}
|
}
|
||||||
|
// Build parallel arrays for arrayZip approach
|
||||||
|
var ops []string
|
||||||
|
var svcs []string
|
||||||
|
serviceOperationsMap := make(map[string][]string)
|
||||||
|
|
||||||
|
for svc, opsList := range *topLevelOps {
|
||||||
|
// Cap operations to 1500 per service (same as original logic)
|
||||||
|
cappedOps := opsList[:int(math.Min(1500, float64(len(opsList))))]
|
||||||
|
serviceOperationsMap[svc] = cappedOps
|
||||||
|
|
||||||
|
// Add to parallel arrays
|
||||||
|
for _, op := range cappedOps {
|
||||||
|
ops = append(ops, op)
|
||||||
|
svcs = append(svcs, svc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Operation pairs count: %d\n", len(ops))
|
||||||
|
|
||||||
|
// Build resource subquery for all services, but only include our target services
|
||||||
|
targetServices := make([]string, 0, len(*topLevelOps))
|
||||||
|
for svc := range *topLevelOps {
|
||||||
|
targetServices = append(targetServices, svc)
|
||||||
|
}
|
||||||
|
resourceSubQuery, err := r.buildResourceSubQueryForServices(queryParams.Tags, targetServices, *queryParams.Start, *queryParams.End)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error building resource subquery", zap.Error(err))
|
||||||
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the optimized single query using arrayZip for tuple creation
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
SELECT
|
||||||
|
resource_string_service$$name AS serviceName,
|
||||||
|
quantile(0.99)(duration_nano) AS p99,
|
||||||
|
avg(duration_nano) AS avgDuration,
|
||||||
|
count(*) AS numCalls,
|
||||||
|
countIf(statusCode = 2) AS numErrors
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE (name, resource_string_service$$name) IN arrayZip(@ops, @svcs)
|
||||||
|
AND timestamp >= @start
|
||||||
|
AND timestamp <= @end
|
||||||
|
AND ts_bucket_start >= @start_bucket
|
||||||
|
AND ts_bucket_start <= @end_bucket
|
||||||
|
AND (resource_fingerprint GLOBAL IN %s)
|
||||||
|
GROUP BY serviceName
|
||||||
|
ORDER BY numCalls DESC`,
|
||||||
|
r.TraceDB, r.traceTableName, resourceSubQuery,
|
||||||
|
)
|
||||||
|
|
||||||
|
args := []interface{}{
|
||||||
|
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
|
||||||
|
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
|
||||||
|
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
|
||||||
|
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
|
||||||
|
// Important: wrap slices with clickhouse.Array for IN/array params
|
||||||
|
clickhouse.Named("ops", ops),
|
||||||
|
clickhouse.Named("svcs", svcs),
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Query: %s\n", query)
|
||||||
|
|
||||||
|
// Execute the single optimized query
|
||||||
|
rows, err := r.db.Query(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error executing optimized services query", zap.Error(err))
|
||||||
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// Process results
|
||||||
serviceItems := []model.ServiceItem{}
|
serviceItems := []model.ServiceItem{}
|
||||||
var wg sync.WaitGroup
|
|
||||||
// limit the number of concurrent queries to not overload the clickhouse server
|
|
||||||
sem := make(chan struct{}, 10)
|
|
||||||
var mtx sync.RWMutex
|
|
||||||
|
|
||||||
for svc, ops := range *topLevelOps {
|
for rows.Next() {
|
||||||
sem <- struct{}{}
|
var serviceItem model.ServiceItem
|
||||||
wg.Add(1)
|
err := rows.ScanStruct(&serviceItem)
|
||||||
go func(svc string, ops []string) {
|
if err != nil {
|
||||||
defer wg.Done()
|
zap.L().Error("Error scanning service item", zap.Error(err))
|
||||||
defer func() { <-sem }()
|
continue
|
||||||
var serviceItem model.ServiceItem
|
}
|
||||||
var numErrors uint64
|
|
||||||
|
|
||||||
// Even if the total number of operations within the time range is less and the all
|
// Skip services with zero calls (match original behavior)
|
||||||
// the top level operations are high, we want to warn to let user know the issue
|
if serviceItem.NumCalls == 0 {
|
||||||
// with the instrumentation
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add data warning for this service
|
||||||
|
if ops, exists := serviceOperationsMap[serviceItem.ServiceName]; exists {
|
||||||
serviceItem.DataWarning = model.DataWarning{
|
serviceItem.DataWarning = model.DataWarning{
|
||||||
TopLevelOps: (*topLevelOps)[svc],
|
TopLevelOps: ops,
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// default max_query_size = 262144
|
// Calculate derived fields
|
||||||
// Let's assume the average size of the item in `ops` is 50 bytes
|
serviceItem.CallRate = float64(serviceItem.NumCalls) / float64(queryParams.Period)
|
||||||
// We can have 262144/50 = 5242 items in the `ops` array
|
if serviceItem.NumCalls > 0 {
|
||||||
// Although we have make it as big as 5k, We cap the number of items
|
serviceItem.ErrorRate = float64(serviceItem.NumErrors) * 100 / float64(serviceItem.NumCalls)
|
||||||
// in the `ops` array to 1500
|
}
|
||||||
|
|
||||||
ops = ops[:int(math.Min(1500, float64(len(ops))))]
|
serviceItems = append(serviceItems, serviceItem)
|
||||||
|
|
||||||
query := fmt.Sprintf(
|
|
||||||
`SELECT
|
|
||||||
quantile(0.99)(duration_nano) as p99,
|
|
||||||
avg(duration_nano) as avgDuration,
|
|
||||||
count(*) as numCalls
|
|
||||||
FROM %s.%s
|
|
||||||
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
|
|
||||||
r.TraceDB, r.traceTableName,
|
|
||||||
)
|
|
||||||
errorQuery := fmt.Sprintf(
|
|
||||||
`SELECT
|
|
||||||
count(*) as numErrors
|
|
||||||
FROM %s.%s
|
|
||||||
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
|
|
||||||
r.TraceDB, r.traceTableName,
|
|
||||||
)
|
|
||||||
|
|
||||||
args := []interface{}{}
|
|
||||||
args = append(args,
|
|
||||||
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
|
|
||||||
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
|
|
||||||
clickhouse.Named("serviceName", svc),
|
|
||||||
clickhouse.Named("names", ops),
|
|
||||||
)
|
|
||||||
|
|
||||||
resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
query += `
|
|
||||||
AND (
|
|
||||||
resource_fingerprint GLOBAL IN ` +
|
|
||||||
resourceSubQuery +
|
|
||||||
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
|
|
||||||
|
|
||||||
args = append(args,
|
|
||||||
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
|
|
||||||
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
|
|
||||||
)
|
|
||||||
|
|
||||||
err = r.db.QueryRow(
|
|
||||||
ctx,
|
|
||||||
query,
|
|
||||||
args...,
|
|
||||||
).ScanStruct(&serviceItem)
|
|
||||||
|
|
||||||
if serviceItem.NumCalls == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
errorQuery += `
|
|
||||||
AND (
|
|
||||||
resource_fingerprint GLOBAL IN ` +
|
|
||||||
resourceSubQuery +
|
|
||||||
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
|
|
||||||
|
|
||||||
err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceItem.ServiceName = svc
|
|
||||||
serviceItem.NumErrors = numErrors
|
|
||||||
mtx.Lock()
|
|
||||||
serviceItems = append(serviceItems, serviceItem)
|
|
||||||
mtx.Unlock()
|
|
||||||
}(svc, ops)
|
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
for idx := range serviceItems {
|
if err = rows.Err(); err != nil {
|
||||||
serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period)
|
zap.L().Error("Error iterating over service results", zap.Error(err))
|
||||||
serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls)
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &serviceItems, nil
|
return &serviceItems, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildResourceSubQueryForServices builds a resource subquery that includes only specific services
|
||||||
|
// This maintains service context while optimizing for multiple services in a single query
|
||||||
|
func (r *ClickHouseReader) buildResourceSubQueryForServices(tags []model.TagQueryParam, targetServices []string, start, end time.Time) (string, error) {
|
||||||
|
if len(targetServices) == 0 {
|
||||||
|
return "", fmt.Errorf("no target services provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tags) == 0 {
|
||||||
|
// For exact parity with per-service behavior, build via resource builder with only service filter
|
||||||
|
filterSet := v3.FilterSet{}
|
||||||
|
filterSet.Items = append(filterSet.Items, v3.FilterItem{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service.name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeResource,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: targetServices,
|
||||||
|
})
|
||||||
|
|
||||||
|
resourceSubQuery, err := resource.BuildResourceSubQuery(
|
||||||
|
r.TraceDB,
|
||||||
|
r.traceResourceTableV3,
|
||||||
|
start.Unix()-1800,
|
||||||
|
end.Unix(),
|
||||||
|
&filterSet,
|
||||||
|
[]v3.AttributeKey{},
|
||||||
|
v3.AttributeKey{},
|
||||||
|
false)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error building resource subquery for services", zap.Error(err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return resourceSubQuery, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert tags to filter set
|
||||||
|
filterSet := v3.FilterSet{}
|
||||||
|
for _, tag := range tags {
|
||||||
|
// Skip the collector id as we don't add it to traces
|
||||||
|
if tag.Key == "signoz.collector.id" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var it v3.FilterItem
|
||||||
|
it.Key = v3.AttributeKey{
|
||||||
|
Key: tag.Key,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeResource,
|
||||||
|
}
|
||||||
|
|
||||||
|
switch tag.Operator {
|
||||||
|
case model.NotInOperator:
|
||||||
|
it.Operator = v3.FilterOperatorNotIn
|
||||||
|
it.Value = tag.StringValues
|
||||||
|
case model.InOperator:
|
||||||
|
it.Operator = v3.FilterOperatorIn
|
||||||
|
it.Value = tag.StringValues
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("operator %s not supported", tag.Operator)
|
||||||
|
}
|
||||||
|
|
||||||
|
filterSet.Items = append(filterSet.Items, it)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add service filter to limit to our target services
|
||||||
|
filterSet.Items = append(filterSet.Items, v3.FilterItem{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service.name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeResource,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: targetServices,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Build resource subquery with service-specific filtering
|
||||||
|
resourceSubQuery, err := resource.BuildResourceSubQuery(
|
||||||
|
r.TraceDB,
|
||||||
|
r.traceResourceTableV3,
|
||||||
|
start.Unix()-1800,
|
||||||
|
end.Unix(),
|
||||||
|
&filterSet,
|
||||||
|
[]v3.AttributeKey{},
|
||||||
|
v3.AttributeKey{},
|
||||||
|
false)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error building resource subquery for services", zap.Error(err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return resourceSubQuery, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildServiceInClause creates a properly quoted IN clause for service names
|
||||||
|
func (r *ClickHouseReader) buildServiceInClause(services []string) string {
|
||||||
|
var quotedServices []string
|
||||||
|
for _, svc := range services {
|
||||||
|
// Escape single quotes and wrap in quotes
|
||||||
|
escapedSvc := strings.ReplaceAll(svc, "'", "\\'")
|
||||||
|
quotedServices = append(quotedServices, fmt.Sprintf("'%s'", escapedSvc))
|
||||||
|
}
|
||||||
|
return strings.Join(quotedServices, ", ")
|
||||||
|
}
|
||||||
|
|
||||||
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
|
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
|
||||||
// status can only be two and if both are selected than they are equivalent to none selected
|
// status can only be two and if both are selected than they are equivalent to none selected
|
||||||
if _, ok := excludeMap["status"]; ok {
|
if _, ok := excludeMap["status"]; ok {
|
||||||
@@ -686,7 +783,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
|
|||||||
}
|
}
|
||||||
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
|
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetEntryPointOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, error) {
|
func (r *ClickHouseReader) GetEntryPointOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, error) {
|
||||||
// Step 1: Get top operations for the given service
|
// Step 1: Get top operations for the given service
|
||||||
topOps, err := r.GetTopOperations(ctx, queryParams)
|
topOps, err := r.GetTopOperations(ctx, queryParams)
|
||||||
@@ -1416,7 +1512,6 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
|||||||
}(ttlPayload)
|
}(ttlPayload)
|
||||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||||
// uuid is used as transaction id
|
// uuid is used as transaction id
|
||||||
uuidWithHyphen := uuid.New()
|
uuidWithHyphen := uuid.New()
|
||||||
@@ -2169,7 +2264,6 @@ func (r *ClickHouseReader) GetNextPrevErrorIDs(ctx context.Context, queryParams
|
|||||||
return &getNextPrevErrorIDsResponse, nil
|
return &getNextPrevErrorIDsResponse, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {
|
func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {
|
||||||
|
|
||||||
var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse
|
var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse
|
||||||
@@ -2905,7 +2999,6 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
|
|||||||
|
|
||||||
return &attributeValues, nil
|
return &attributeValues, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.UUID, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
|
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.UUID, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
|
||||||
|
|
||||||
unixMilli := common.PastDayRoundOff()
|
unixMilli := common.PastDayRoundOff()
|
||||||
@@ -5180,7 +5273,6 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.
|
|||||||
|
|
||||||
return &response, nil
|
return &response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
|
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
|
||||||
var args []interface{}
|
var args []interface{}
|
||||||
|
|
||||||
@@ -5933,7 +6025,6 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
|
|||||||
}
|
}
|
||||||
return hasLE, nil
|
return hasLE, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
|
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
|
||||||
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
|
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
|
||||||
var missingMetrics []string
|
var missingMetrics []string
|
||||||
|
|||||||
Reference in New Issue
Block a user