Compare commits

...

7 Commits

Author SHA1 Message Date
Nikhil Mantri
702831e26b Merge branch 'main' into feat/warnings_for_empty_data 2026-01-14 13:57:15 +05:30
nikhilmantri0902
5c6346df11 chore: remove array sort 2026-01-13 17:20:02 +05:30
nikhilmantri0902
860c3e70ac chore: added diagnostic columns logic 2026-01-13 14:10:21 +05:30
nikhilmantri0902
3d8bfd47fa fix: add alias to cached data and also set back alias from cache when present 2026-01-12 17:49:31 +05:30
nikhilmantri0902
f01b00bd5c chore: return warnings seperately 2026-01-10 23:15:51 +05:30
nikhilmantri0902
120317321c chore: return warnings seperately 2026-01-10 22:36:38 +05:30
nikhilmantri0902
895e92893c chore: query and postprocessmetric query modification 2026-01-10 18:56:14 +05:30
7 changed files with 228 additions and 27 deletions

View File

@@ -490,6 +490,7 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
key string
}
seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries)
aliasMap := make(map[int]string)
for _, bucket := range buckets {
var tsData *qbtypes.TimeSeriesData
@@ -499,6 +500,10 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
}
for _, aggBucket := range tsData.Aggregations {
if aggBucket.Alias != "" {
aliasMap[aggBucket.Index] = aggBucket.Alias
}
for _, series := range aggBucket.Series {
// Create series key from labels
key := seriesKey{
@@ -571,7 +576,13 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
}
}
var alias string
if aliasMap[index] != "" {
alias = aliasMap[index]
}
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
Alias: alias,
Index: index,
Series: seriesList,
})
@@ -736,6 +747,7 @@ func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoun
for _, aggBucket := range tsData.Aggregations {
trimmedBucket := &qbtypes.AggregationBucket{
Index: aggBucket.Index,
Alias: aggBucket.Alias,
}
for _, series := range aggBucket.Series {

View File

@@ -200,7 +200,7 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return nil, err
}
result.Warnings = stmt.Warnings
result.Warnings = append(result.Warnings, stmt.Warnings...)
result.WarningsDocURL = stmt.WarningsDocURL
return result, nil
}

View File

@@ -12,17 +12,23 @@ import (
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/bytedance/sonic"
)
const (
diagnosticColumnIndexBase = 1000000
)
var (
aggRe = regexp.MustCompile(`^__result_(\d+)$`)
// legacyReservedColumnTargetAliases identifies result value from a user
// written clickhouse query. The column alias indcate which value is
// to be considered as final result (or target)
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
diagnosticColumnAliases = []string{telemetrymetrics.DiagnosticColumnCumulativeHistLeCount, telemetrymetrics.DiagnosticColumnCumulativeHistLeSum}
)
// consume reads every row and shapes it into the payload expected for the
@@ -69,6 +75,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
key string // deterministic join of label values
}
seriesMap := map[sKey]*qbtypes.TimeSeries{}
diagnosticSeriesMap := map[string]*qbtypes.TimeSeries{}
stepMs := uint64(step.Duration.Milliseconds())
@@ -113,12 +120,13 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
}
var (
ts int64
lblVals = make([]string, 0, lblValsCapacity)
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
aggValues = map[int]float64{} // all __result_N in this row
fallbackValue float64 // value when NO __result_N columns exist
fallbackSeen bool
ts int64
lblVals = make([]string, 0, lblValsCapacity)
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
aggValues = map[int]float64{} // all __result_N in this row
diagnosticValues = map[string]float64{} // all diagnostic columns in this row
fallbackValue float64 // value when NO __result_N columns exist
fallbackSeen bool
)
for idx, ptr := range slots {
@@ -130,7 +138,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
case *float64, *float32, *int64, *int32, *uint64, *uint32:
val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface())
if m := aggRe.FindStringSubmatch(name); m != nil {
if slices.Contains(diagnosticColumnAliases, name) {
diagnosticValues[name] = val
} else if m := aggRe.FindStringSubmatch(name); m != nil {
id, _ := strconv.Atoi(m[1])
aggValues[id] = val
} else if numericColsCount == 1 { // classic single-value query
@@ -152,7 +162,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
tempVal := reflect.ValueOf(ptr)
if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() {
val := numericAsFloat(tempVal.Elem().Elem().Interface())
if m := aggRe.FindStringSubmatch(name); m != nil {
if slices.Contains(diagnosticColumnAliases, name) {
diagnosticValues[name] = val
} else if m := aggRe.FindStringSubmatch(name); m != nil {
id, _ := strconv.Atoi(m[1])
aggValues[id] = val
} else if numericColsCount == 1 { // classic single-value query
@@ -195,6 +207,23 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
}
}
// fetch and store diagnostic values in diagnosticSeriesMap
for diagnosticColName, val := range diagnosticValues {
if math.IsNaN(val) || math.IsInf(val, 0) {
continue
}
diagSeries, ok := diagnosticSeriesMap[diagnosticColName]
if !ok {
diagSeries = &qbtypes.TimeSeries{}
diagnosticSeriesMap[diagnosticColName] = diagSeries
}
diagSeries.Values = append(diagSeries.Values, &qbtypes.TimeSeriesValue{
Timestamp: ts,
Value: val,
Partial: isPartialValue(ts),
})
}
// Edge-case: no __result_N columns, but a single numeric column present
if len(aggValues) == 0 && fallbackSeen {
aggValues[0] = fallbackValue
@@ -231,6 +260,20 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
return nil, err
}
diagnosticBuckets := make([]*qbtypes.AggregationBucket, 0)
// TODO(nikhilmantri0902, srikanthccv): below HACK - this is a temporary index introduced becausing caching grouping and merging happens on index
// Should we improve the caching grouping and merging to not depend on index?
diagNosticTemporayIndex := diagnosticColumnIndexBase
for diagColName, diagSeries := range diagnosticSeriesMap {
diagnosticBucket := &qbtypes.AggregationBucket{
Index: diagNosticTemporayIndex,
Alias: diagColName,
}
diagnosticBucket.Series = append(diagnosticBucket.Series, diagSeries)
diagnosticBuckets = append(diagnosticBuckets, diagnosticBucket)
diagNosticTemporayIndex++
}
maxAgg := -1
for k := range seriesMap {
if k.agg > maxAgg {
@@ -240,6 +283,8 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
if maxAgg < 0 {
return &qbtypes.TimeSeriesData{
QueryName: queryName,
// return with diagNostic buckets
Aggregations: diagnosticBuckets,
}, nil
}
@@ -261,6 +306,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
}
}
// add diagNostic buckets
nonEmpty = append(nonEmpty, diagnosticBuckets...)
return &qbtypes.TimeSeriesData{
QueryName: queryName,
Aggregations: nonEmpty,

View File

@@ -9,6 +9,7 @@ import (
"strings"
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -46,7 +47,7 @@ func getQueryName(spec any) string {
return getqueryInfo(spec).Name
}
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, []string, string, error) {
// Convert results to typed format for processing
typedResults := make(map[string]*qbtypes.Result)
for name, result := range results {
@@ -96,7 +97,7 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
for name, v := range typedResults {
retResult[name] = v.Value
}
return retResult, nil
return retResult, nil, "", nil
}
}
@@ -108,11 +109,11 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
firstQueryName := getQueryName(req.CompositeQuery.Queries[0].Spec)
if firstQueryName != "" && tableResult["table"] != nil {
// Return table under first query name
return map[string]any{firstQueryName: tableResult["table"]}, nil
return map[string]any{firstQueryName: tableResult["table"]}, nil, "", nil
}
}
return tableResult, nil
return tableResult, nil, "", nil
}
if req.RequestType == qbtypes.RequestTypeTimeSeries && req.FormatOptions != nil && req.FormatOptions.FillGaps {
@@ -155,7 +156,19 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
finalResults[name] = result.Value
}
return finalResults, nil
// collect postProcessWarnings from typed results
var postProcessWarnings []string
var postProcessWarningsDocURL string
for _, res := range typedResults {
if len(res.Warnings) > 0 {
postProcessWarnings = append(postProcessWarnings, res.Warnings...)
}
if res.WarningsDocURL != "" {
postProcessWarningsDocURL = res.WarningsDocURL
}
}
return finalResults, postProcessWarnings, postProcessWarningsDocURL, nil
}
// postProcessBuilderQuery applies postprocessing to a single builder query result
@@ -178,6 +191,86 @@ func postProcessBuilderQuery[T any](
return result
}
func removeDiagnosticSeriesAndCheckWarnings(result *qbtypes.Result) {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return
}
if tsData == nil {
return
}
var nonDiagnosticBuckets []*qbtypes.AggregationBucket
var bucketSum, bucketCount *qbtypes.AggregationBucket
// First pass: identify diagnostic buckets and separate them from non-diagnostic ones
for _, b := range tsData.Aggregations {
if !slices.Contains(diagnosticColumnAliases, b.Alias) {
nonDiagnosticBuckets = append(nonDiagnosticBuckets, b)
continue
}
// warning columns
switch b.Alias {
case telemetrymetrics.DiagnosticColumnCumulativeHistLeSum:
bucketSum = b
case telemetrymetrics.DiagnosticColumnCumulativeHistLeCount:
bucketCount = b
}
}
// Second pass: calculate warnings based on diagnostic buckets (only once, after identifying both)
allZero := false
if bucketSum != nil {
allZero = true
if len(bucketSum.Series) == 0 {
allZero = false // dont calculate for no values
} else {
for _, s := range bucketSum.Series {
for _, v := range s.Values {
if v.Value > 0 {
allZero = false
break
}
}
if !allZero {
break
}
}
}
}
allSparse := false
if bucketCount != nil {
allSparse = true
if len(bucketCount.Series) == 0 {
allSparse = false // dont calculate for no values
} else {
for _, s := range bucketCount.Series {
for _, v := range s.Values {
if v.Value >= 2 {
allSparse = false
break
}
}
if !allSparse {
break
}
}
}
}
if allZero {
result.Warnings = append(result.Warnings, "No change observed for this cumulative metric in the selected range.")
}
if allSparse {
result.Warnings = append(result.Warnings, "Only +Inf bucket present; add finite buckets to compute quantiles.")
}
tsData.Aggregations = nonDiagnosticBuckets
result.Value = tsData
}
// postProcessMetricQuery applies postprocessing to a metric query result
func postProcessMetricQuery(
q *querier,
@@ -186,6 +279,8 @@ func postProcessMetricQuery(
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
removeDiagnosticSeriesAndCheckWarnings(result)
config := query.Aggregations[0]
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)

View File

@@ -588,11 +588,19 @@ func (q *querier) run(
}
}
processedResults, err := q.postProcessResults(ctx, results, req)
processedResults, processedResultsWarnings, processedResultsDocsURL, err := q.postProcessResults(ctx, results, req)
if err != nil {
return nil, err
}
// Merge warnings collected during post-processing.
if len(processedResultsWarnings) > 0 {
warnings = append(warnings, processedResultsWarnings...)
}
if processedResultsDocsURL != "" {
warningsDocURL = processedResultsDocsURL
}
// attach step interval to metadata so client can make informed decisions, ex: width of the bar
// or go to related logs/traces from a point in line/bar chart with correct time range
stepIntervals := make(map[string]uint64, len(steps))

View File

@@ -2,6 +2,11 @@ package telemetrymetrics
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
const (
DiagnosticColumnCumulativeHistLeSum = "__diagnostic_cumulative_hist_le_sum"
DiagnosticColumnCumulativeHistLeCount = "__diagnostic_cumulative_hist_le_count"
)
var IntrinsicFields = []string{
"__normalized",
"temporality",

View File

@@ -559,12 +559,8 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
cteArgs [][]any,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (*qbtypes.Statement, error) {
combined := querybuilder.CombineCTEs(cteFragments)
var combined string
var args []any
for _, a := range cteArgs {
args = append(args, a...)
}
sb := sqlbuilder.NewSelectBuilder()
@@ -574,25 +570,59 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
}
if quantile != 0 && query.Aggregations[0].Type != metrictypes.ExpHistogramType {
// Build diagnostics CTE to pre-aggregate arrays and counts per ts/group.
diag := sqlbuilder.NewSelectBuilder()
diag.Select("ts")
for _, g := range query.GroupBy {
diag.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
diag.SelectMore("groupArray(le) AS les")
diag.SelectMore("groupArray(value) AS vals")
diag.SelectMore("arraySum(groupArray(value)) AS bucket_sum")
diag.SelectMore("countDistinct(le) AS bucket_count")
diag.From("__spatial_aggregation_cte")
for _, g := range query.GroupBy {
diag.GroupBy(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
diag.GroupBy("ts")
diagQ, diagArgs := diag.BuildWithFlavor(sqlbuilder.ClickHouse)
cteFragments = append(cteFragments, fmt.Sprintf("__diagnostics_cte AS (%s)", diagQ))
cteArgs = append(cteArgs, diagArgs)
combined = querybuilder.CombineCTEs(cteFragments)
for _, a := range cteArgs {
args = append(args, a...)
}
// Final select from diagnostics cte
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
// expose quantile and diagnostics as __result_* so they are parsed, then stripped in post-processing
// TODO(nikhilmantri0902): putting below as __result_0, __result_1, __result_2,
// but is this correct and not a hack to extract the warnings?
// what if the aliases are used by client, there will be collision in select statements ...
sb.SelectMore(fmt.Sprintf(
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
"histogramQuantile(arrayMap(x -> toFloat64(x), les), vals, %.3f) AS __result_0",
quantile,
))
sb.From("__spatial_aggregation_cte")
for _, g := range query.GroupBy {
sb.GroupBy(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
sb.GroupBy("ts")
sb.SelectMore(fmt.Sprintf("bucket_sum AS %s", DiagnosticColumnCumulativeHistLeSum))
sb.SelectMore(fmt.Sprintf("bucket_count AS %s", DiagnosticColumnCumulativeHistLeCount))
sb.From("__diagnostics_cte")
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
sb.Having(rewrittenExpr)
}
} else {
combined = querybuilder.CombineCTEs(cteFragments)
for _, a := range cteArgs {
args = append(args, a...)
}
sb.Select("*")
sb.From("__spatial_aggregation_cte")
if query.Having != nil && query.Having.Expression != "" {
@@ -603,5 +633,8 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
}
q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return &qbtypes.Statement{Query: combined + q, Args: append(args, a...)}, nil
return &qbtypes.Statement{
Query: combined + q,
Args: append(args, a...),
}, nil
}