mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-07 02:12:11 +00:00
Compare commits
7 Commits
migrate-fi
...
feat/warni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
702831e26b | ||
|
|
5c6346df11 | ||
|
|
860c3e70ac | ||
|
|
3d8bfd47fa | ||
|
|
f01b00bd5c | ||
|
|
120317321c | ||
|
|
895e92893c |
@@ -490,6 +490,7 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
|
||||
key string
|
||||
}
|
||||
seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries)
|
||||
aliasMap := make(map[int]string)
|
||||
|
||||
for _, bucket := range buckets {
|
||||
var tsData *qbtypes.TimeSeriesData
|
||||
@@ -499,6 +500,10 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
|
||||
}
|
||||
|
||||
for _, aggBucket := range tsData.Aggregations {
|
||||
if aggBucket.Alias != "" {
|
||||
aliasMap[aggBucket.Index] = aggBucket.Alias
|
||||
}
|
||||
|
||||
for _, series := range aggBucket.Series {
|
||||
// Create series key from labels
|
||||
key := seriesKey{
|
||||
@@ -571,7 +576,13 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
|
||||
}
|
||||
}
|
||||
|
||||
var alias string
|
||||
if aliasMap[index] != "" {
|
||||
alias = aliasMap[index]
|
||||
}
|
||||
|
||||
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
|
||||
Alias: alias,
|
||||
Index: index,
|
||||
Series: seriesList,
|
||||
})
|
||||
@@ -736,6 +747,7 @@ func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoun
|
||||
for _, aggBucket := range tsData.Aggregations {
|
||||
trimmedBucket := &qbtypes.AggregationBucket{
|
||||
Index: aggBucket.Index,
|
||||
Alias: aggBucket.Alias,
|
||||
}
|
||||
|
||||
for _, series := range aggBucket.Series {
|
||||
|
||||
@@ -200,7 +200,7 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.Warnings = stmt.Warnings
|
||||
result.Warnings = append(result.Warnings, stmt.Warnings...)
|
||||
result.WarningsDocURL = stmt.WarningsDocURL
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -12,17 +12,23 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/bytedance/sonic"
|
||||
)
|
||||
|
||||
const (
|
||||
diagnosticColumnIndexBase = 1000000
|
||||
)
|
||||
|
||||
var (
|
||||
aggRe = regexp.MustCompile(`^__result_(\d+)$`)
|
||||
// legacyReservedColumnTargetAliases identifies result value from a user
|
||||
// written clickhouse query. The column alias indcate which value is
|
||||
// to be considered as final result (or target)
|
||||
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
|
||||
diagnosticColumnAliases = []string{telemetrymetrics.DiagnosticColumnCumulativeHistLeCount, telemetrymetrics.DiagnosticColumnCumulativeHistLeSum}
|
||||
)
|
||||
|
||||
// consume reads every row and shapes it into the payload expected for the
|
||||
@@ -69,6 +75,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
key string // deterministic join of label values
|
||||
}
|
||||
seriesMap := map[sKey]*qbtypes.TimeSeries{}
|
||||
diagnosticSeriesMap := map[string]*qbtypes.TimeSeries{}
|
||||
|
||||
stepMs := uint64(step.Duration.Milliseconds())
|
||||
|
||||
@@ -113,12 +120,13 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
}
|
||||
|
||||
var (
|
||||
ts int64
|
||||
lblVals = make([]string, 0, lblValsCapacity)
|
||||
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
|
||||
aggValues = map[int]float64{} // all __result_N in this row
|
||||
fallbackValue float64 // value when NO __result_N columns exist
|
||||
fallbackSeen bool
|
||||
ts int64
|
||||
lblVals = make([]string, 0, lblValsCapacity)
|
||||
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
|
||||
aggValues = map[int]float64{} // all __result_N in this row
|
||||
diagnosticValues = map[string]float64{} // all diagnostic columns in this row
|
||||
fallbackValue float64 // value when NO __result_N columns exist
|
||||
fallbackSeen bool
|
||||
)
|
||||
|
||||
for idx, ptr := range slots {
|
||||
@@ -130,7 +138,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
|
||||
case *float64, *float32, *int64, *int32, *uint64, *uint32:
|
||||
val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface())
|
||||
if m := aggRe.FindStringSubmatch(name); m != nil {
|
||||
if slices.Contains(diagnosticColumnAliases, name) {
|
||||
diagnosticValues[name] = val
|
||||
} else if m := aggRe.FindStringSubmatch(name); m != nil {
|
||||
id, _ := strconv.Atoi(m[1])
|
||||
aggValues[id] = val
|
||||
} else if numericColsCount == 1 { // classic single-value query
|
||||
@@ -152,7 +162,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
tempVal := reflect.ValueOf(ptr)
|
||||
if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() {
|
||||
val := numericAsFloat(tempVal.Elem().Elem().Interface())
|
||||
if m := aggRe.FindStringSubmatch(name); m != nil {
|
||||
if slices.Contains(diagnosticColumnAliases, name) {
|
||||
diagnosticValues[name] = val
|
||||
} else if m := aggRe.FindStringSubmatch(name); m != nil {
|
||||
id, _ := strconv.Atoi(m[1])
|
||||
aggValues[id] = val
|
||||
} else if numericColsCount == 1 { // classic single-value query
|
||||
@@ -195,6 +207,23 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
}
|
||||
}
|
||||
|
||||
// fetch and store diagnostic values in diagnosticSeriesMap
|
||||
for diagnosticColName, val := range diagnosticValues {
|
||||
if math.IsNaN(val) || math.IsInf(val, 0) {
|
||||
continue
|
||||
}
|
||||
diagSeries, ok := diagnosticSeriesMap[diagnosticColName]
|
||||
if !ok {
|
||||
diagSeries = &qbtypes.TimeSeries{}
|
||||
diagnosticSeriesMap[diagnosticColName] = diagSeries
|
||||
}
|
||||
diagSeries.Values = append(diagSeries.Values, &qbtypes.TimeSeriesValue{
|
||||
Timestamp: ts,
|
||||
Value: val,
|
||||
Partial: isPartialValue(ts),
|
||||
})
|
||||
}
|
||||
|
||||
// Edge-case: no __result_N columns, but a single numeric column present
|
||||
if len(aggValues) == 0 && fallbackSeen {
|
||||
aggValues[0] = fallbackValue
|
||||
@@ -231,6 +260,20 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
return nil, err
|
||||
}
|
||||
|
||||
diagnosticBuckets := make([]*qbtypes.AggregationBucket, 0)
|
||||
// TODO(nikhilmantri0902, srikanthccv): below HACK - this is a temporary index introduced becausing caching grouping and merging happens on index
|
||||
// Should we improve the caching grouping and merging to not depend on index?
|
||||
diagNosticTemporayIndex := diagnosticColumnIndexBase
|
||||
for diagColName, diagSeries := range diagnosticSeriesMap {
|
||||
diagnosticBucket := &qbtypes.AggregationBucket{
|
||||
Index: diagNosticTemporayIndex,
|
||||
Alias: diagColName,
|
||||
}
|
||||
diagnosticBucket.Series = append(diagnosticBucket.Series, diagSeries)
|
||||
diagnosticBuckets = append(diagnosticBuckets, diagnosticBucket)
|
||||
diagNosticTemporayIndex++
|
||||
}
|
||||
|
||||
maxAgg := -1
|
||||
for k := range seriesMap {
|
||||
if k.agg > maxAgg {
|
||||
@@ -240,6 +283,8 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
if maxAgg < 0 {
|
||||
return &qbtypes.TimeSeriesData{
|
||||
QueryName: queryName,
|
||||
// return with diagNostic buckets
|
||||
Aggregations: diagnosticBuckets,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -261,6 +306,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
}
|
||||
}
|
||||
|
||||
// add diagNostic buckets
|
||||
nonEmpty = append(nonEmpty, diagnosticBuckets...)
|
||||
|
||||
return &qbtypes.TimeSeriesData{
|
||||
QueryName: queryName,
|
||||
Aggregations: nonEmpty,
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/govaluate"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
@@ -46,7 +47,7 @@ func getQueryName(spec any) string {
|
||||
return getqueryInfo(spec).Name
|
||||
}
|
||||
|
||||
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, []string, string, error) {
|
||||
// Convert results to typed format for processing
|
||||
typedResults := make(map[string]*qbtypes.Result)
|
||||
for name, result := range results {
|
||||
@@ -96,7 +97,7 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
|
||||
for name, v := range typedResults {
|
||||
retResult[name] = v.Value
|
||||
}
|
||||
return retResult, nil
|
||||
return retResult, nil, "", nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,11 +109,11 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
|
||||
firstQueryName := getQueryName(req.CompositeQuery.Queries[0].Spec)
|
||||
if firstQueryName != "" && tableResult["table"] != nil {
|
||||
// Return table under first query name
|
||||
return map[string]any{firstQueryName: tableResult["table"]}, nil
|
||||
return map[string]any{firstQueryName: tableResult["table"]}, nil, "", nil
|
||||
}
|
||||
}
|
||||
|
||||
return tableResult, nil
|
||||
return tableResult, nil, "", nil
|
||||
}
|
||||
|
||||
if req.RequestType == qbtypes.RequestTypeTimeSeries && req.FormatOptions != nil && req.FormatOptions.FillGaps {
|
||||
@@ -155,7 +156,19 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
|
||||
finalResults[name] = result.Value
|
||||
}
|
||||
|
||||
return finalResults, nil
|
||||
// collect postProcessWarnings from typed results
|
||||
var postProcessWarnings []string
|
||||
var postProcessWarningsDocURL string
|
||||
for _, res := range typedResults {
|
||||
if len(res.Warnings) > 0 {
|
||||
postProcessWarnings = append(postProcessWarnings, res.Warnings...)
|
||||
}
|
||||
if res.WarningsDocURL != "" {
|
||||
postProcessWarningsDocURL = res.WarningsDocURL
|
||||
}
|
||||
}
|
||||
|
||||
return finalResults, postProcessWarnings, postProcessWarningsDocURL, nil
|
||||
}
|
||||
|
||||
// postProcessBuilderQuery applies postprocessing to a single builder query result
|
||||
@@ -178,6 +191,86 @@ func postProcessBuilderQuery[T any](
|
||||
return result
|
||||
}
|
||||
|
||||
func removeDiagnosticSeriesAndCheckWarnings(result *qbtypes.Result) {
|
||||
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if tsData == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var nonDiagnosticBuckets []*qbtypes.AggregationBucket
|
||||
var bucketSum, bucketCount *qbtypes.AggregationBucket
|
||||
|
||||
// First pass: identify diagnostic buckets and separate them from non-diagnostic ones
|
||||
for _, b := range tsData.Aggregations {
|
||||
if !slices.Contains(diagnosticColumnAliases, b.Alias) {
|
||||
nonDiagnosticBuckets = append(nonDiagnosticBuckets, b)
|
||||
continue
|
||||
}
|
||||
// warning columns
|
||||
switch b.Alias {
|
||||
case telemetrymetrics.DiagnosticColumnCumulativeHistLeSum:
|
||||
bucketSum = b
|
||||
case telemetrymetrics.DiagnosticColumnCumulativeHistLeCount:
|
||||
bucketCount = b
|
||||
}
|
||||
}
|
||||
|
||||
// Second pass: calculate warnings based on diagnostic buckets (only once, after identifying both)
|
||||
allZero := false
|
||||
if bucketSum != nil {
|
||||
allZero = true
|
||||
if len(bucketSum.Series) == 0 {
|
||||
allZero = false // dont calculate for no values
|
||||
} else {
|
||||
for _, s := range bucketSum.Series {
|
||||
for _, v := range s.Values {
|
||||
if v.Value > 0 {
|
||||
allZero = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !allZero {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
allSparse := false
|
||||
if bucketCount != nil {
|
||||
allSparse = true
|
||||
if len(bucketCount.Series) == 0 {
|
||||
allSparse = false // dont calculate for no values
|
||||
} else {
|
||||
for _, s := range bucketCount.Series {
|
||||
for _, v := range s.Values {
|
||||
if v.Value >= 2 {
|
||||
allSparse = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !allSparse {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if allZero {
|
||||
result.Warnings = append(result.Warnings, "No change observed for this cumulative metric in the selected range.")
|
||||
}
|
||||
if allSparse {
|
||||
result.Warnings = append(result.Warnings, "Only +Inf bucket present; add finite buckets to compute quantiles.")
|
||||
}
|
||||
|
||||
tsData.Aggregations = nonDiagnosticBuckets
|
||||
result.Value = tsData
|
||||
}
|
||||
|
||||
// postProcessMetricQuery applies postprocessing to a metric query result
|
||||
func postProcessMetricQuery(
|
||||
q *querier,
|
||||
@@ -186,6 +279,8 @@ func postProcessMetricQuery(
|
||||
req *qbtypes.QueryRangeRequest,
|
||||
) *qbtypes.Result {
|
||||
|
||||
removeDiagnosticSeriesAndCheckWarnings(result)
|
||||
|
||||
config := query.Aggregations[0]
|
||||
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
|
||||
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
|
||||
|
||||
@@ -588,11 +588,19 @@ func (q *querier) run(
|
||||
}
|
||||
}
|
||||
|
||||
processedResults, err := q.postProcessResults(ctx, results, req)
|
||||
processedResults, processedResultsWarnings, processedResultsDocsURL, err := q.postProcessResults(ctx, results, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Merge warnings collected during post-processing.
|
||||
if len(processedResultsWarnings) > 0 {
|
||||
warnings = append(warnings, processedResultsWarnings...)
|
||||
}
|
||||
if processedResultsDocsURL != "" {
|
||||
warningsDocURL = processedResultsDocsURL
|
||||
}
|
||||
|
||||
// attach step interval to metadata so client can make informed decisions, ex: width of the bar
|
||||
// or go to related logs/traces from a point in line/bar chart with correct time range
|
||||
stepIntervals := make(map[string]uint64, len(steps))
|
||||
|
||||
@@ -2,6 +2,11 @@ package telemetrymetrics
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
|
||||
const (
|
||||
DiagnosticColumnCumulativeHistLeSum = "__diagnostic_cumulative_hist_le_sum"
|
||||
DiagnosticColumnCumulativeHistLeCount = "__diagnostic_cumulative_hist_le_count"
|
||||
)
|
||||
|
||||
var IntrinsicFields = []string{
|
||||
"__normalized",
|
||||
"temporality",
|
||||
|
||||
@@ -559,12 +559,8 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
cteArgs [][]any,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
) (*qbtypes.Statement, error) {
|
||||
combined := querybuilder.CombineCTEs(cteFragments)
|
||||
|
||||
var combined string
|
||||
var args []any
|
||||
for _, a := range cteArgs {
|
||||
args = append(args, a...)
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
@@ -574,25 +570,59 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
}
|
||||
|
||||
if quantile != 0 && query.Aggregations[0].Type != metrictypes.ExpHistogramType {
|
||||
// Build diagnostics CTE to pre-aggregate arrays and counts per ts/group.
|
||||
diag := sqlbuilder.NewSelectBuilder()
|
||||
diag.Select("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
diag.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
diag.SelectMore("groupArray(le) AS les")
|
||||
diag.SelectMore("groupArray(value) AS vals")
|
||||
diag.SelectMore("arraySum(groupArray(value)) AS bucket_sum")
|
||||
diag.SelectMore("countDistinct(le) AS bucket_count")
|
||||
diag.From("__spatial_aggregation_cte")
|
||||
for _, g := range query.GroupBy {
|
||||
diag.GroupBy(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
diag.GroupBy("ts")
|
||||
diagQ, diagArgs := diag.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
cteFragments = append(cteFragments, fmt.Sprintf("__diagnostics_cte AS (%s)", diagQ))
|
||||
cteArgs = append(cteArgs, diagArgs)
|
||||
|
||||
combined = querybuilder.CombineCTEs(cteFragments)
|
||||
for _, a := range cteArgs {
|
||||
args = append(args, a...)
|
||||
}
|
||||
|
||||
// Final select from diagnostics cte
|
||||
sb.Select("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
// expose quantile and diagnostics as __result_* so they are parsed, then stripped in post-processing
|
||||
|
||||
// TODO(nikhilmantri0902): putting below as __result_0, __result_1, __result_2,
|
||||
// but is this correct and not a hack to extract the warnings?
|
||||
// what if the aliases are used by client, there will be collision in select statements ...
|
||||
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
|
||||
"histogramQuantile(arrayMap(x -> toFloat64(x), les), vals, %.3f) AS __result_0",
|
||||
quantile,
|
||||
))
|
||||
sb.From("__spatial_aggregation_cte")
|
||||
for _, g := range query.GroupBy {
|
||||
sb.GroupBy(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
sb.GroupBy("ts")
|
||||
sb.SelectMore(fmt.Sprintf("bucket_sum AS %s", DiagnosticColumnCumulativeHistLeSum))
|
||||
sb.SelectMore(fmt.Sprintf("bucket_count AS %s", DiagnosticColumnCumulativeHistLeCount))
|
||||
|
||||
sb.From("__diagnostics_cte")
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Having(rewrittenExpr)
|
||||
}
|
||||
} else {
|
||||
combined = querybuilder.CombineCTEs(cteFragments)
|
||||
for _, a := range cteArgs {
|
||||
args = append(args, a...)
|
||||
}
|
||||
sb.Select("*")
|
||||
sb.From("__spatial_aggregation_cte")
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
@@ -603,5 +633,8 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
}
|
||||
|
||||
q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return &qbtypes.Statement{Query: combined + q, Args: append(args, a...)}, nil
|
||||
return &qbtypes.Statement{
|
||||
Query: combined + q,
|
||||
Args: append(args, a...),
|
||||
}, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user