mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-04 21:03:21 +00:00
Compare commits
7 Commits
testing-fe
...
feat/warni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
702831e26b | ||
|
|
5c6346df11 | ||
|
|
860c3e70ac | ||
|
|
3d8bfd47fa | ||
|
|
f01b00bd5c | ||
|
|
120317321c | ||
|
|
895e92893c |
@@ -490,6 +490,7 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
|
|||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries)
|
seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries)
|
||||||
|
aliasMap := make(map[int]string)
|
||||||
|
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
var tsData *qbtypes.TimeSeriesData
|
var tsData *qbtypes.TimeSeriesData
|
||||||
@@ -499,6 +500,10 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, aggBucket := range tsData.Aggregations {
|
for _, aggBucket := range tsData.Aggregations {
|
||||||
|
if aggBucket.Alias != "" {
|
||||||
|
aliasMap[aggBucket.Index] = aggBucket.Alias
|
||||||
|
}
|
||||||
|
|
||||||
for _, series := range aggBucket.Series {
|
for _, series := range aggBucket.Series {
|
||||||
// Create series key from labels
|
// Create series key from labels
|
||||||
key := seriesKey{
|
key := seriesKey{
|
||||||
@@ -571,7 +576,13 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var alias string
|
||||||
|
if aliasMap[index] != "" {
|
||||||
|
alias = aliasMap[index]
|
||||||
|
}
|
||||||
|
|
||||||
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
|
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
|
||||||
|
Alias: alias,
|
||||||
Index: index,
|
Index: index,
|
||||||
Series: seriesList,
|
Series: seriesList,
|
||||||
})
|
})
|
||||||
@@ -736,6 +747,7 @@ func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoun
|
|||||||
for _, aggBucket := range tsData.Aggregations {
|
for _, aggBucket := range tsData.Aggregations {
|
||||||
trimmedBucket := &qbtypes.AggregationBucket{
|
trimmedBucket := &qbtypes.AggregationBucket{
|
||||||
Index: aggBucket.Index,
|
Index: aggBucket.Index,
|
||||||
|
Alias: aggBucket.Alias,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, series := range aggBucket.Series {
|
for _, series := range aggBucket.Series {
|
||||||
|
|||||||
@@ -200,7 +200,7 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Warnings = stmt.Warnings
|
result.Warnings = append(result.Warnings, stmt.Warnings...)
|
||||||
result.WarningsDocURL = stmt.WarningsDocURL
|
result.WarningsDocURL = stmt.WarningsDocURL
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,17 +12,23 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||||
"github.com/bytedance/sonic"
|
"github.com/bytedance/sonic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
diagnosticColumnIndexBase = 1000000
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
aggRe = regexp.MustCompile(`^__result_(\d+)$`)
|
aggRe = regexp.MustCompile(`^__result_(\d+)$`)
|
||||||
// legacyReservedColumnTargetAliases identifies result value from a user
|
// legacyReservedColumnTargetAliases identifies result value from a user
|
||||||
// written clickhouse query. The column alias indcate which value is
|
// written clickhouse query. The column alias indcate which value is
|
||||||
// to be considered as final result (or target)
|
// to be considered as final result (or target)
|
||||||
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
|
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
|
||||||
|
diagnosticColumnAliases = []string{telemetrymetrics.DiagnosticColumnCumulativeHistLeCount, telemetrymetrics.DiagnosticColumnCumulativeHistLeSum}
|
||||||
)
|
)
|
||||||
|
|
||||||
// consume reads every row and shapes it into the payload expected for the
|
// consume reads every row and shapes it into the payload expected for the
|
||||||
@@ -69,6 +75,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
|||||||
key string // deterministic join of label values
|
key string // deterministic join of label values
|
||||||
}
|
}
|
||||||
seriesMap := map[sKey]*qbtypes.TimeSeries{}
|
seriesMap := map[sKey]*qbtypes.TimeSeries{}
|
||||||
|
diagnosticSeriesMap := map[string]*qbtypes.TimeSeries{}
|
||||||
|
|
||||||
stepMs := uint64(step.Duration.Milliseconds())
|
stepMs := uint64(step.Duration.Milliseconds())
|
||||||
|
|
||||||
@@ -113,12 +120,13 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ts int64
|
ts int64
|
||||||
lblVals = make([]string, 0, lblValsCapacity)
|
lblVals = make([]string, 0, lblValsCapacity)
|
||||||
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
|
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
|
||||||
aggValues = map[int]float64{} // all __result_N in this row
|
aggValues = map[int]float64{} // all __result_N in this row
|
||||||
fallbackValue float64 // value when NO __result_N columns exist
|
diagnosticValues = map[string]float64{} // all diagnostic columns in this row
|
||||||
fallbackSeen bool
|
fallbackValue float64 // value when NO __result_N columns exist
|
||||||
|
fallbackSeen bool
|
||||||
)
|
)
|
||||||
|
|
||||||
for idx, ptr := range slots {
|
for idx, ptr := range slots {
|
||||||
@@ -130,7 +138,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
|||||||
|
|
||||||
case *float64, *float32, *int64, *int32, *uint64, *uint32:
|
case *float64, *float32, *int64, *int32, *uint64, *uint32:
|
||||||
val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface())
|
val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface())
|
||||||
if m := aggRe.FindStringSubmatch(name); m != nil {
|
if slices.Contains(diagnosticColumnAliases, name) {
|
||||||
|
diagnosticValues[name] = val
|
||||||
|
} else if m := aggRe.FindStringSubmatch(name); m != nil {
|
||||||
id, _ := strconv.Atoi(m[1])
|
id, _ := strconv.Atoi(m[1])
|
||||||
aggValues[id] = val
|
aggValues[id] = val
|
||||||
} else if numericColsCount == 1 { // classic single-value query
|
} else if numericColsCount == 1 { // classic single-value query
|
||||||
@@ -152,7 +162,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
|||||||
tempVal := reflect.ValueOf(ptr)
|
tempVal := reflect.ValueOf(ptr)
|
||||||
if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() {
|
if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() {
|
||||||
val := numericAsFloat(tempVal.Elem().Elem().Interface())
|
val := numericAsFloat(tempVal.Elem().Elem().Interface())
|
||||||
if m := aggRe.FindStringSubmatch(name); m != nil {
|
if slices.Contains(diagnosticColumnAliases, name) {
|
||||||
|
diagnosticValues[name] = val
|
||||||
|
} else if m := aggRe.FindStringSubmatch(name); m != nil {
|
||||||
id, _ := strconv.Atoi(m[1])
|
id, _ := strconv.Atoi(m[1])
|
||||||
aggValues[id] = val
|
aggValues[id] = val
|
||||||
} else if numericColsCount == 1 { // classic single-value query
|
} else if numericColsCount == 1 { // classic single-value query
|
||||||
@@ -195,6 +207,23 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetch and store diagnostic values in diagnosticSeriesMap
|
||||||
|
for diagnosticColName, val := range diagnosticValues {
|
||||||
|
if math.IsNaN(val) || math.IsInf(val, 0) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
diagSeries, ok := diagnosticSeriesMap[diagnosticColName]
|
||||||
|
if !ok {
|
||||||
|
diagSeries = &qbtypes.TimeSeries{}
|
||||||
|
diagnosticSeriesMap[diagnosticColName] = diagSeries
|
||||||
|
}
|
||||||
|
diagSeries.Values = append(diagSeries.Values, &qbtypes.TimeSeriesValue{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: val,
|
||||||
|
Partial: isPartialValue(ts),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Edge-case: no __result_N columns, but a single numeric column present
|
// Edge-case: no __result_N columns, but a single numeric column present
|
||||||
if len(aggValues) == 0 && fallbackSeen {
|
if len(aggValues) == 0 && fallbackSeen {
|
||||||
aggValues[0] = fallbackValue
|
aggValues[0] = fallbackValue
|
||||||
@@ -231,6 +260,20 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
diagnosticBuckets := make([]*qbtypes.AggregationBucket, 0)
|
||||||
|
// TODO(nikhilmantri0902, srikanthccv): below HACK - this is a temporary index introduced becausing caching grouping and merging happens on index
|
||||||
|
// Should we improve the caching grouping and merging to not depend on index?
|
||||||
|
diagNosticTemporayIndex := diagnosticColumnIndexBase
|
||||||
|
for diagColName, diagSeries := range diagnosticSeriesMap {
|
||||||
|
diagnosticBucket := &qbtypes.AggregationBucket{
|
||||||
|
Index: diagNosticTemporayIndex,
|
||||||
|
Alias: diagColName,
|
||||||
|
}
|
||||||
|
diagnosticBucket.Series = append(diagnosticBucket.Series, diagSeries)
|
||||||
|
diagnosticBuckets = append(diagnosticBuckets, diagnosticBucket)
|
||||||
|
diagNosticTemporayIndex++
|
||||||
|
}
|
||||||
|
|
||||||
maxAgg := -1
|
maxAgg := -1
|
||||||
for k := range seriesMap {
|
for k := range seriesMap {
|
||||||
if k.agg > maxAgg {
|
if k.agg > maxAgg {
|
||||||
@@ -240,6 +283,8 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
|||||||
if maxAgg < 0 {
|
if maxAgg < 0 {
|
||||||
return &qbtypes.TimeSeriesData{
|
return &qbtypes.TimeSeriesData{
|
||||||
QueryName: queryName,
|
QueryName: queryName,
|
||||||
|
// return with diagNostic buckets
|
||||||
|
Aggregations: diagnosticBuckets,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -261,6 +306,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add diagNostic buckets
|
||||||
|
nonEmpty = append(nonEmpty, diagnosticBuckets...)
|
||||||
|
|
||||||
return &qbtypes.TimeSeriesData{
|
return &qbtypes.TimeSeriesData{
|
||||||
QueryName: queryName,
|
QueryName: queryName,
|
||||||
Aggregations: nonEmpty,
|
Aggregations: nonEmpty,
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/SigNoz/govaluate"
|
"github.com/SigNoz/govaluate"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||||
)
|
)
|
||||||
@@ -46,7 +47,7 @@ func getQueryName(spec any) string {
|
|||||||
return getqueryInfo(spec).Name
|
return getqueryInfo(spec).Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, []string, string, error) {
|
||||||
// Convert results to typed format for processing
|
// Convert results to typed format for processing
|
||||||
typedResults := make(map[string]*qbtypes.Result)
|
typedResults := make(map[string]*qbtypes.Result)
|
||||||
for name, result := range results {
|
for name, result := range results {
|
||||||
@@ -96,7 +97,7 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
|
|||||||
for name, v := range typedResults {
|
for name, v := range typedResults {
|
||||||
retResult[name] = v.Value
|
retResult[name] = v.Value
|
||||||
}
|
}
|
||||||
return retResult, nil
|
return retResult, nil, "", nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,11 +109,11 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
|
|||||||
firstQueryName := getQueryName(req.CompositeQuery.Queries[0].Spec)
|
firstQueryName := getQueryName(req.CompositeQuery.Queries[0].Spec)
|
||||||
if firstQueryName != "" && tableResult["table"] != nil {
|
if firstQueryName != "" && tableResult["table"] != nil {
|
||||||
// Return table under first query name
|
// Return table under first query name
|
||||||
return map[string]any{firstQueryName: tableResult["table"]}, nil
|
return map[string]any{firstQueryName: tableResult["table"]}, nil, "", nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return tableResult, nil
|
return tableResult, nil, "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.RequestType == qbtypes.RequestTypeTimeSeries && req.FormatOptions != nil && req.FormatOptions.FillGaps {
|
if req.RequestType == qbtypes.RequestTypeTimeSeries && req.FormatOptions != nil && req.FormatOptions.FillGaps {
|
||||||
@@ -155,7 +156,19 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
|
|||||||
finalResults[name] = result.Value
|
finalResults[name] = result.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
return finalResults, nil
|
// collect postProcessWarnings from typed results
|
||||||
|
var postProcessWarnings []string
|
||||||
|
var postProcessWarningsDocURL string
|
||||||
|
for _, res := range typedResults {
|
||||||
|
if len(res.Warnings) > 0 {
|
||||||
|
postProcessWarnings = append(postProcessWarnings, res.Warnings...)
|
||||||
|
}
|
||||||
|
if res.WarningsDocURL != "" {
|
||||||
|
postProcessWarningsDocURL = res.WarningsDocURL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return finalResults, postProcessWarnings, postProcessWarningsDocURL, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// postProcessBuilderQuery applies postprocessing to a single builder query result
|
// postProcessBuilderQuery applies postprocessing to a single builder query result
|
||||||
@@ -178,6 +191,86 @@ func postProcessBuilderQuery[T any](
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func removeDiagnosticSeriesAndCheckWarnings(result *qbtypes.Result) {
|
||||||
|
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if tsData == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var nonDiagnosticBuckets []*qbtypes.AggregationBucket
|
||||||
|
var bucketSum, bucketCount *qbtypes.AggregationBucket
|
||||||
|
|
||||||
|
// First pass: identify diagnostic buckets and separate them from non-diagnostic ones
|
||||||
|
for _, b := range tsData.Aggregations {
|
||||||
|
if !slices.Contains(diagnosticColumnAliases, b.Alias) {
|
||||||
|
nonDiagnosticBuckets = append(nonDiagnosticBuckets, b)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// warning columns
|
||||||
|
switch b.Alias {
|
||||||
|
case telemetrymetrics.DiagnosticColumnCumulativeHistLeSum:
|
||||||
|
bucketSum = b
|
||||||
|
case telemetrymetrics.DiagnosticColumnCumulativeHistLeCount:
|
||||||
|
bucketCount = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second pass: calculate warnings based on diagnostic buckets (only once, after identifying both)
|
||||||
|
allZero := false
|
||||||
|
if bucketSum != nil {
|
||||||
|
allZero = true
|
||||||
|
if len(bucketSum.Series) == 0 {
|
||||||
|
allZero = false // dont calculate for no values
|
||||||
|
} else {
|
||||||
|
for _, s := range bucketSum.Series {
|
||||||
|
for _, v := range s.Values {
|
||||||
|
if v.Value > 0 {
|
||||||
|
allZero = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !allZero {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
allSparse := false
|
||||||
|
if bucketCount != nil {
|
||||||
|
allSparse = true
|
||||||
|
if len(bucketCount.Series) == 0 {
|
||||||
|
allSparse = false // dont calculate for no values
|
||||||
|
} else {
|
||||||
|
for _, s := range bucketCount.Series {
|
||||||
|
for _, v := range s.Values {
|
||||||
|
if v.Value >= 2 {
|
||||||
|
allSparse = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !allSparse {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if allZero {
|
||||||
|
result.Warnings = append(result.Warnings, "No change observed for this cumulative metric in the selected range.")
|
||||||
|
}
|
||||||
|
if allSparse {
|
||||||
|
result.Warnings = append(result.Warnings, "Only +Inf bucket present; add finite buckets to compute quantiles.")
|
||||||
|
}
|
||||||
|
|
||||||
|
tsData.Aggregations = nonDiagnosticBuckets
|
||||||
|
result.Value = tsData
|
||||||
|
}
|
||||||
|
|
||||||
// postProcessMetricQuery applies postprocessing to a metric query result
|
// postProcessMetricQuery applies postprocessing to a metric query result
|
||||||
func postProcessMetricQuery(
|
func postProcessMetricQuery(
|
||||||
q *querier,
|
q *querier,
|
||||||
@@ -186,6 +279,8 @@ func postProcessMetricQuery(
|
|||||||
req *qbtypes.QueryRangeRequest,
|
req *qbtypes.QueryRangeRequest,
|
||||||
) *qbtypes.Result {
|
) *qbtypes.Result {
|
||||||
|
|
||||||
|
removeDiagnosticSeriesAndCheckWarnings(result)
|
||||||
|
|
||||||
config := query.Aggregations[0]
|
config := query.Aggregations[0]
|
||||||
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
|
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
|
||||||
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
|
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
|
||||||
|
|||||||
@@ -588,11 +588,19 @@ func (q *querier) run(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
processedResults, err := q.postProcessResults(ctx, results, req)
|
processedResults, processedResultsWarnings, processedResultsDocsURL, err := q.postProcessResults(ctx, results, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Merge warnings collected during post-processing.
|
||||||
|
if len(processedResultsWarnings) > 0 {
|
||||||
|
warnings = append(warnings, processedResultsWarnings...)
|
||||||
|
}
|
||||||
|
if processedResultsDocsURL != "" {
|
||||||
|
warningsDocURL = processedResultsDocsURL
|
||||||
|
}
|
||||||
|
|
||||||
// attach step interval to metadata so client can make informed decisions, ex: width of the bar
|
// attach step interval to metadata so client can make informed decisions, ex: width of the bar
|
||||||
// or go to related logs/traces from a point in line/bar chart with correct time range
|
// or go to related logs/traces from a point in line/bar chart with correct time range
|
||||||
stepIntervals := make(map[string]uint64, len(steps))
|
stepIntervals := make(map[string]uint64, len(steps))
|
||||||
|
|||||||
@@ -2,6 +2,11 @@ package telemetrymetrics
|
|||||||
|
|
||||||
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||||
|
|
||||||
|
const (
|
||||||
|
DiagnosticColumnCumulativeHistLeSum = "__diagnostic_cumulative_hist_le_sum"
|
||||||
|
DiagnosticColumnCumulativeHistLeCount = "__diagnostic_cumulative_hist_le_count"
|
||||||
|
)
|
||||||
|
|
||||||
var IntrinsicFields = []string{
|
var IntrinsicFields = []string{
|
||||||
"__normalized",
|
"__normalized",
|
||||||
"temporality",
|
"temporality",
|
||||||
|
|||||||
@@ -559,12 +559,8 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
|||||||
cteArgs [][]any,
|
cteArgs [][]any,
|
||||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||||
) (*qbtypes.Statement, error) {
|
) (*qbtypes.Statement, error) {
|
||||||
combined := querybuilder.CombineCTEs(cteFragments)
|
var combined string
|
||||||
|
|
||||||
var args []any
|
var args []any
|
||||||
for _, a := range cteArgs {
|
|
||||||
args = append(args, a...)
|
|
||||||
}
|
|
||||||
|
|
||||||
sb := sqlbuilder.NewSelectBuilder()
|
sb := sqlbuilder.NewSelectBuilder()
|
||||||
|
|
||||||
@@ -574,25 +570,59 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if quantile != 0 && query.Aggregations[0].Type != metrictypes.ExpHistogramType {
|
if quantile != 0 && query.Aggregations[0].Type != metrictypes.ExpHistogramType {
|
||||||
|
// Build diagnostics CTE to pre-aggregate arrays and counts per ts/group.
|
||||||
|
diag := sqlbuilder.NewSelectBuilder()
|
||||||
|
diag.Select("ts")
|
||||||
|
for _, g := range query.GroupBy {
|
||||||
|
diag.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||||
|
}
|
||||||
|
diag.SelectMore("groupArray(le) AS les")
|
||||||
|
diag.SelectMore("groupArray(value) AS vals")
|
||||||
|
diag.SelectMore("arraySum(groupArray(value)) AS bucket_sum")
|
||||||
|
diag.SelectMore("countDistinct(le) AS bucket_count")
|
||||||
|
diag.From("__spatial_aggregation_cte")
|
||||||
|
for _, g := range query.GroupBy {
|
||||||
|
diag.GroupBy(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||||
|
}
|
||||||
|
diag.GroupBy("ts")
|
||||||
|
diagQ, diagArgs := diag.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||||
|
cteFragments = append(cteFragments, fmt.Sprintf("__diagnostics_cte AS (%s)", diagQ))
|
||||||
|
cteArgs = append(cteArgs, diagArgs)
|
||||||
|
|
||||||
|
combined = querybuilder.CombineCTEs(cteFragments)
|
||||||
|
for _, a := range cteArgs {
|
||||||
|
args = append(args, a...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Final select from diagnostics cte
|
||||||
sb.Select("ts")
|
sb.Select("ts")
|
||||||
for _, g := range query.GroupBy {
|
for _, g := range query.GroupBy {
|
||||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||||
}
|
}
|
||||||
|
// expose quantile and diagnostics as __result_* so they are parsed, then stripped in post-processing
|
||||||
|
|
||||||
|
// TODO(nikhilmantri0902): putting below as __result_0, __result_1, __result_2,
|
||||||
|
// but is this correct and not a hack to extract the warnings?
|
||||||
|
// what if the aliases are used by client, there will be collision in select statements ...
|
||||||
|
|
||||||
sb.SelectMore(fmt.Sprintf(
|
sb.SelectMore(fmt.Sprintf(
|
||||||
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
|
"histogramQuantile(arrayMap(x -> toFloat64(x), les), vals, %.3f) AS __result_0",
|
||||||
quantile,
|
quantile,
|
||||||
))
|
))
|
||||||
sb.From("__spatial_aggregation_cte")
|
sb.SelectMore(fmt.Sprintf("bucket_sum AS %s", DiagnosticColumnCumulativeHistLeSum))
|
||||||
for _, g := range query.GroupBy {
|
sb.SelectMore(fmt.Sprintf("bucket_count AS %s", DiagnosticColumnCumulativeHistLeCount))
|
||||||
sb.GroupBy(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
|
||||||
}
|
sb.From("__diagnostics_cte")
|
||||||
sb.GroupBy("ts")
|
|
||||||
if query.Having != nil && query.Having.Expression != "" {
|
if query.Having != nil && query.Having.Expression != "" {
|
||||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||||
sb.Having(rewrittenExpr)
|
sb.Having(rewrittenExpr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
combined = querybuilder.CombineCTEs(cteFragments)
|
||||||
|
for _, a := range cteArgs {
|
||||||
|
args = append(args, a...)
|
||||||
|
}
|
||||||
sb.Select("*")
|
sb.Select("*")
|
||||||
sb.From("__spatial_aggregation_cte")
|
sb.From("__spatial_aggregation_cte")
|
||||||
if query.Having != nil && query.Having.Expression != "" {
|
if query.Having != nil && query.Having.Expression != "" {
|
||||||
@@ -603,5 +633,8 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
|||||||
}
|
}
|
||||||
|
|
||||||
q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||||
return &qbtypes.Statement{Query: combined + q, Args: append(args, a...)}, nil
|
return &qbtypes.Statement{
|
||||||
|
Query: combined + q,
|
||||||
|
Args: append(args, a...),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user