Files
signoz/pkg/querier/postprocess.go
Piyush Singariya 847bc71f4e fix: postprocess json logs message key (#11189)
* fix: backend changes for message key postprocessing

* fix: message postprocessing

* chore: update in e2e tests

* fix: table view

* chore: separate frontend from backend changes

* fix: integration tests
2026-05-20 11:46:27 +00:00

1083 lines
31 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package querier
import (
"context"
"fmt"
"log/slog"
"math"
"slices"
"sort"
"strings"
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// queryInfo holds common query properties.
type queryInfo struct {
Name string
Disabled bool
Step qbtypes.Step
}
// getqueryInfo extracts common info from any query type.
func getqueryInfo(spec any) queryInfo {
switch s := spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderTraceOperator:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderFormula:
return queryInfo{Name: s.Name, Disabled: s.Disabled}
case qbtypes.PromQuery:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.Step}
case qbtypes.ClickHouseQuery:
return queryInfo{Name: s.Name, Disabled: s.Disabled}
}
return queryInfo{}
}
// getQueryName is a convenience function when only name is needed.
func getQueryName(spec any) string {
return getqueryInfo(spec).Name
}
func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
// Convert results to typed format for processing
typedResults := make(map[string]*qbtypes.Result)
for name, result := range results {
typedResults[name] = &qbtypes.Result{
Value: result,
}
}
for _, query := range req.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessBuilderQuery(q, result, spec, req)
typedResults[spec.Name] = result
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessBuilderQuery(q, result, spec, req)
result = q.postProcessLogBody(ctx, orgID, result, req)
typedResults[spec.Name] = result
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessMetricQuery(q, result, spec, req)
typedResults[spec.Name] = result
}
case qbtypes.QueryBuilderTraceOperator:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessTraceOperator(q, result, spec, req)
typedResults[spec.Name] = result
}
}
}
// Apply formula calculations
typedResults = q.applyFormulas(ctx, typedResults, req)
// Filter out disabled queries
typedResults = q.filterDisabledQueries(typedResults, req)
// Apply table formatting for UI if requested
if req.FormatOptions != nil && req.FormatOptions.FormatTableResultForUI && req.RequestType == qbtypes.RequestTypeScalar {
// merge result only needed for non-CH query
if len(req.CompositeQuery.Queries) == 1 {
if req.CompositeQuery.Queries[0].Type == qbtypes.QueryTypeClickHouseSQL {
retResult := map[string]any{}
for name, v := range typedResults {
retResult[name] = v.Value
}
return retResult, nil
}
}
// Format results as a table - this merges all queries into a single table
tableResult := q.formatScalarResultsAsTable(typedResults, req)
// Return the table under the first query's name so it gets included in results
if len(req.CompositeQuery.Queries) > 0 {
firstQueryName := getQueryName(req.CompositeQuery.Queries[0].Spec)
if firstQueryName != "" && tableResult["table"] != nil {
// Return table under first query name
return map[string]any{firstQueryName: tableResult["table"]}, nil
}
}
return tableResult, nil
}
if req.RequestType == qbtypes.RequestTypeTimeSeries && req.FormatOptions != nil && req.FormatOptions.FillGaps {
for name := range typedResults {
if req.SkipFillGaps(name) {
continue
}
stepInterval, err := req.StepIntervalForQuery(name)
if err != nil {
return nil, err
}
funcs := []qbtypes.Function{{Name: qbtypes.FunctionNameFillZero}}
funcs = q.prepareFillZeroArgsWithStep(funcs, req, stepInterval)
// empty time series if it doesn't exist
tsData, ok := typedResults[name].Value.(*qbtypes.TimeSeriesData)
if !ok {
tsData = &qbtypes.TimeSeriesData{}
}
if len(tsData.Aggregations) == 0 {
numAgg := req.NumAggregationForQuery(name)
tsData.Aggregations = make([]*qbtypes.AggregationBucket, numAgg)
for idx := range numAgg {
tsData.Aggregations[idx] = &qbtypes.AggregationBucket{
Index: int(idx),
Series: []*qbtypes.TimeSeries{
{
Labels: make([]*qbtypes.Label, 0),
Values: make([]*qbtypes.TimeSeriesValue, 0),
},
},
}
}
}
typedResults[name] = q.applyFunctions(typedResults[name], funcs)
}
}
// Convert back to map[string]any
finalResults := make(map[string]any)
for name, result := range typedResults {
finalResults[name] = result.Value
}
return finalResults, nil
}
// postProcessBuilderQuery applies postprocessing to a single builder query result.
func postProcessBuilderQuery[T any](
q *querier,
result *qbtypes.Result,
query qbtypes.QueryBuilderQuery[T],
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
result = q.applySeriesLimit(result, query.Limit, query.Order)
// Apply functions
if len(query.Functions) > 0 {
step := query.StepInterval.Milliseconds()
functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step)
result = q.applyFunctions(result, functions)
}
return result
}
// postProcessMetricQuery applies postprocessing to a metric query result.
func postProcessMetricQuery(
q *querier,
result *qbtypes.Result,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
config := query.Aggregations[0]
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
timeSpaceAggOrderBy := fmt.Sprintf("%s(%s(%s))", config.SpaceAggregation.StringValue(), config.TimeAggregation.StringValue(), config.MetricName)
for idx := range query.Order {
if query.Order[idx].Key.Name == spaceAggOrderBy ||
query.Order[idx].Key.Name == timeAggOrderBy ||
query.Order[idx].Key.Name == timeSpaceAggOrderBy {
query.Order[idx].Key.Name = qbtypes.DefaultOrderByKey
}
}
result = q.applySeriesLimit(result, query.Limit, query.Order)
if len(query.Functions) > 0 {
step := query.StepInterval.Milliseconds()
functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step)
result = q.applyFunctions(result, functions)
}
// Apply reduce to for scalar request type
if req.RequestType == qbtypes.RequestTypeScalar {
if len(query.Aggregations) > 0 && query.Aggregations[0].ReduceTo != qbtypes.ReduceToUnknown {
result = q.applyMetricReduceTo(result, query.Aggregations[0].ReduceTo)
}
}
return result
}
// postProcessTraceOperator applies postprocessing to a trace operator query result.
func postProcessTraceOperator(
q *querier,
result *qbtypes.Result,
query qbtypes.QueryBuilderTraceOperator,
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
result = q.applySeriesLimit(result, query.Limit, query.Order)
// Apply functions if any
if len(query.Functions) > 0 {
step := query.StepInterval.Milliseconds()
functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step)
result = q.applyFunctions(result, functions)
}
return result
}
// applyMetricReduceTo applies reduce to operation using the metric's ReduceTo field.
func (q *querier) applyMetricReduceTo(result *qbtypes.Result, reduceOp qbtypes.ReduceTo) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
for i, series := range agg.Series {
// Use the FunctionReduceTo helper
reducedSeries := qbtypes.FunctionReduceTo(series, reduceOp)
agg.Series[i] = reducedSeries
}
}
}
scalarData := convertTimeSeriesDataToScalar(tsData, tsData.QueryName)
result.Value = scalarData
return result
}
// applySeriesLimit limits the number of series in the result.
func (q *querier) applySeriesLimit(result *qbtypes.Result, limit int, orderBy []qbtypes.OrderBy) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
// Use the ApplySeriesLimit function from querybuildertypes
agg.Series = qbtypes.ApplySeriesLimit(agg.Series, orderBy, limit)
}
}
return result
}
// applyFunctions applies functions to time series data.
func (q *querier) applyFunctions(result *qbtypes.Result, functions []qbtypes.Function) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
for i, series := range agg.Series {
agg.Series[i] = qbtypes.ApplyFunctions(functions, series)
}
}
}
return result
}
// applyFormulas processes formula queries in the composite query.
func (q *querier) applyFormulas(ctx context.Context, results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result {
// Collect formula queries
formulaQueries := make(map[string]qbtypes.QueryBuilderFormula)
for _, query := range req.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeFormula {
if formula, ok := query.Spec.(qbtypes.QueryBuilderFormula); ok {
formulaQueries[formula.Name] = formula
}
}
}
// Process each formula
for name, formula := range formulaQueries {
for idx := range formula.Order {
if formula.Order[idx].Key.Name == formula.Name || formula.Order[idx].Key.Name == formula.Expression {
formula.Order[idx].Key.Name = qbtypes.DefaultOrderByKey
}
}
// Check if we're dealing with time series or scalar data
switch req.RequestType {
case qbtypes.RequestTypeTimeSeries:
result := q.processTimeSeriesFormula(ctx, results, formula, req)
if result != nil {
result = q.applySeriesLimit(result, formula.Limit, formula.Order)
results[name] = result
}
case qbtypes.RequestTypeScalar:
result := q.processScalarFormula(ctx, results, formula, req)
// For scalar results, apply limit by processScalarFormula itself since it needs to be applied before converting back to scalar format
results[name] = result
}
}
return results
}
// processTimeSeriesFormula handles formula evaluation for time series data.
func (q *querier) processTimeSeriesFormula(
ctx context.Context,
results map[string]*qbtypes.Result,
formula qbtypes.QueryBuilderFormula,
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
// Prepare time series data for formula evaluation
timeSeriesData := make(map[string]*qbtypes.TimeSeriesData)
// Extract time series data from results
for queryName, result := range results {
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
timeSeriesData[queryName] = tsData
}
}
canDefaultZero := req.GetQueriesSupportingZeroDefault()
// Create formula evaluator
evaluator, err := qbtypes.NewFormulaEvaluator(formula.Expression, canDefaultZero)
if err != nil {
q.logger.ErrorContext(ctx, "failed to create formula evaluator", errors.Attr(err), slog.String("formula", formula.Name))
return nil
}
// Evaluate the formula
formulaSeries, err := evaluator.EvaluateFormula(timeSeriesData)
if err != nil {
q.logger.ErrorContext(ctx, "failed to evaluate formula", errors.Attr(err), slog.String("formula", formula.Name))
return nil
}
// Create result for formula
formulaResult := &qbtypes.TimeSeriesData{
QueryName: formula.Name,
Aggregations: []*qbtypes.AggregationBucket{
{
Index: 0,
Series: formulaSeries,
},
},
}
// Apply functions if any
result := &qbtypes.Result{
Value: formulaResult,
}
if len(formula.Functions) > 0 {
// For formulas, calculate GCD of steps from queries in the expression
step := q.calculateFormulaStep(formula.Expression, req)
functions := q.prepareFillZeroArgsWithStep(formula.Functions, req, step)
result = q.applyFunctions(result, functions)
}
return result
}
func (q *querier) processScalarFormula(
ctx context.Context,
results map[string]*qbtypes.Result,
formula qbtypes.QueryBuilderFormula,
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
// conver scalar data to time series format with zero timestamp
// so we can run it through formula evaluator
timeSeriesData := make(map[string]*qbtypes.TimeSeriesData)
for queryName, result := range results {
if scalarData, ok := result.Value.(*qbtypes.ScalarData); ok {
// the scalar results would have just one point so negligible cost
tsData := &qbtypes.TimeSeriesData{
QueryName: scalarData.QueryName,
Aggregations: make([]*qbtypes.AggregationBucket, 0),
}
aggColumns := make(map[int]int) // aggregation index -> column index
for colIdx, col := range scalarData.Columns {
if col.Type == qbtypes.ColumnTypeAggregation {
aggColumns[int(col.AggregationIndex)] = colIdx
}
}
type labeledRowData struct {
labels []*qbtypes.Label
values map[int]float64 // aggregation index -> value
}
rowsByLabels := make(map[string]*labeledRowData)
for _, row := range scalarData.Data {
labels := make([]*qbtypes.Label, 0)
for i, col := range scalarData.Columns {
if col.Type == qbtypes.ColumnTypeGroup && i < len(row) {
l := &qbtypes.Label{
Key: col.TelemetryFieldKey,
Value: getPointerValue(row[i]),
}
labels = append(labels, l)
}
}
labelKey := qbtypes.GetUniqueSeriesKey(labels)
rowData, exists := rowsByLabels[labelKey]
if !exists {
rowData = &labeledRowData{
labels: labels,
values: make(map[int]float64),
}
rowsByLabels[labelKey] = rowData
}
for aggIdx, colIdx := range aggColumns {
if colIdx < len(row) {
if val, ok := toFloat64(row[colIdx]); ok {
rowData.values[aggIdx] = val
} else {
q.logger.WarnContext(ctx, "skipped adding unrecognized value")
}
}
}
}
labelKeys := make([]string, 0, len(rowsByLabels))
for key := range rowsByLabels {
labelKeys = append(labelKeys, key)
}
slices.Sort(labelKeys)
aggIndices := make([]int, 0, len(aggColumns))
for aggIdx := range aggColumns {
aggIndices = append(aggIndices, aggIdx)
}
slices.Sort(aggIndices)
for _, aggIdx := range aggIndices {
colIdx := aggColumns[aggIdx]
bucket := &qbtypes.AggregationBucket{
Index: aggIdx,
Alias: scalarData.Columns[colIdx].Name,
Meta: scalarData.Columns[colIdx].Meta,
Series: make([]*qbtypes.TimeSeries, 0),
}
for _, labelKey := range labelKeys {
rowData := rowsByLabels[labelKey]
if val, exists := rowData.values[aggIdx]; exists {
series := &qbtypes.TimeSeries{
Labels: rowData.labels,
Values: []*qbtypes.TimeSeriesValue{{
Timestamp: 0,
Value: val,
}},
}
bucket.Series = append(bucket.Series, series)
}
}
tsData.Aggregations = append(tsData.Aggregations, bucket)
}
timeSeriesData[queryName] = tsData
}
}
canDefaultZero := req.GetQueriesSupportingZeroDefault()
evaluator, err := qbtypes.NewFormulaEvaluator(formula.Expression, canDefaultZero)
if err != nil {
q.logger.ErrorContext(ctx, "failed to create formula evaluator", errors.Attr(err), slog.String("formula", formula.Name))
return nil
}
formulaSeries, err := evaluator.EvaluateFormula(timeSeriesData)
if err != nil {
q.logger.ErrorContext(ctx, "failed to evaluate formula", errors.Attr(err), slog.String("formula", formula.Name))
return nil
}
// Apply ordering (and limit) before converting to scalar format.
formulaSeries = qbtypes.ApplySeriesLimit(formulaSeries, formula.Order, formula.Limit)
// Convert back to scalar format
scalarResult := &qbtypes.ScalarData{
QueryName: formula.Name,
Columns: make([]*qbtypes.ColumnDescriptor, 0),
Data: make([][]any, 0),
}
if len(formulaSeries) > 0 && len(formulaSeries[0].Labels) > 0 {
for _, label := range formulaSeries[0].Labels {
scalarResult.Columns = append(scalarResult.Columns, &qbtypes.ColumnDescriptor{
TelemetryFieldKey: label.Key,
QueryName: formula.Name,
Type: qbtypes.ColumnTypeGroup,
})
}
}
scalarResult.Columns = append(scalarResult.Columns, &qbtypes.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "__result"},
QueryName: formula.Name,
AggregationIndex: 0,
Type: qbtypes.ColumnTypeAggregation,
})
for _, series := range formulaSeries {
row := make([]any, len(scalarResult.Columns))
for i, label := range series.Labels {
if i < len(row)-1 {
row[i] = label.Value
}
}
if len(series.Values) > 0 {
row[len(row)-1] = series.Values[0].Value
} else {
row[len(row)-1] = "n/a"
}
scalarResult.Data = append(scalarResult.Data, row)
}
return &qbtypes.Result{
Value: scalarResult,
}
}
// filterDisabledQueries removes results for disabled queries.
func (q *querier) filterDisabledQueries(results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result {
filtered := make(map[string]*qbtypes.Result)
for _, query := range req.CompositeQuery.Queries {
info := getqueryInfo(query.Spec)
if !info.Disabled {
if result, ok := results[info.Name]; ok {
filtered[info.Name] = result
}
}
}
return filtered
}
// formatScalarResultsAsTable formats scalar results as a unified table for UI display.
func (q *querier) formatScalarResultsAsTable(results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]any {
if len(results) == 0 {
return map[string]any{"table": &qbtypes.ScalarData{}}
}
// apply default sorting if no order specified
applyDefaultSort := !req.HasOrderSpecified()
// Convert all results to ScalarData first
scalarResults := make(map[string]*qbtypes.ScalarData)
for name, result := range results {
if sd, ok := result.Value.(*qbtypes.ScalarData); ok {
scalarResults[name] = sd
} else if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
scalarResults[name] = convertTimeSeriesDataToScalar(tsData, name)
}
}
// If single result already has multiple queries, just deduplicate
if len(scalarResults) == 1 {
for _, sd := range scalarResults {
if hasMultipleQueries(sd) {
return map[string]any{"table": deduplicateRows(sd, applyDefaultSort)}
}
}
}
// Otherwise merge all results
merged := mergeScalarData(scalarResults, applyDefaultSort)
return map[string]any{"table": merged}
}
// convertTimeSeriesDataToScalar converts time series to scalar format.
func convertTimeSeriesDataToScalar(tsData *qbtypes.TimeSeriesData, queryName string) *qbtypes.ScalarData {
if tsData == nil || len(tsData.Aggregations) == 0 {
return &qbtypes.ScalarData{QueryName: queryName}
}
// Series can have ragged label sets; build the column schema from the
// union of all label keys (first-seen order) and fill rows by key lookup.
keyOrder := []telemetrytypes.TelemetryFieldKey{}
keyIndex := map[string]int{}
for _, series := range tsData.Aggregations[0].Series {
for _, label := range series.Labels {
if _, ok := keyIndex[label.Key.Name]; ok {
continue
}
keyIndex[label.Key.Name] = len(keyOrder)
keyOrder = append(keyOrder, label.Key)
}
}
columns := make([]*qbtypes.ColumnDescriptor, 0, len(keyOrder)+len(tsData.Aggregations))
for _, key := range keyOrder {
columns = append(columns, &qbtypes.ColumnDescriptor{
TelemetryFieldKey: key,
QueryName: queryName,
Type: qbtypes.ColumnTypeGroup,
})
}
// Add aggregation columns
for _, agg := range tsData.Aggregations {
name := agg.Alias
if name == "" {
name = fmt.Sprintf("__result_%d", agg.Index)
}
columns = append(columns, &qbtypes.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
QueryName: queryName,
AggregationIndex: int64(agg.Index),
Meta: agg.Meta,
Type: qbtypes.ColumnTypeAggregation,
})
}
// Build rows.
groupColCount := len(keyOrder)
data := [][]any{}
for seriesIdx, series := range tsData.Aggregations[0].Series {
row := make([]any, len(columns))
// Place each label under its key's column (by lookup, not index).
for _, label := range series.Labels {
row[keyIndex[label.Key.Name]] = label.Value
}
// Add aggregation values (last value)
for aggIdx, agg := range tsData.Aggregations {
if seriesIdx < len(agg.Series) && len(agg.Series[seriesIdx].Values) > 0 {
lastValue := agg.Series[seriesIdx].Values[len(agg.Series[seriesIdx].Values)-1].Value
row[groupColCount+aggIdx] = lastValue
} else {
row[groupColCount+aggIdx] = "n/a"
}
}
data = append(data, row)
}
return &qbtypes.ScalarData{
QueryName: queryName,
Columns: columns,
Data: data,
}
}
// hasMultipleQueries checks if ScalarData contains columns from multiple queries.
func hasMultipleQueries(sd *qbtypes.ScalarData) bool {
queries := make(map[string]bool)
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeAggregation && col.QueryName != "" {
queries[col.QueryName] = true
}
}
return len(queries) > 1
}
// deduplicateRows removes duplicate rows based on group columns.
func deduplicateRows(sd *qbtypes.ScalarData, applyDefaultSort bool) *qbtypes.ScalarData {
// Find group column indices
groupIndices := []int{}
for i, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupIndices = append(groupIndices, i)
}
}
// Build unique rows map, preserve order
uniqueRows := make(map[string][]any)
var keyOrder []string
for _, row := range sd.Data {
key := buildRowKey(row, groupIndices)
if existing, found := uniqueRows[key]; found {
// Merge non-n/a values
for i, val := range row {
if existing[i] == "n/a" && val != "n/a" {
existing[i] = val
}
}
} else {
rowCopy := make([]any, len(row))
copy(rowCopy, row)
uniqueRows[key] = rowCopy
keyOrder = append(keyOrder, key)
}
}
// Convert back to slice, preserve the original order
data := make([][]any, 0, len(uniqueRows))
for _, key := range keyOrder {
data = append(data, uniqueRows[key])
}
// sort by first aggregation (descending) if no order was specified
if applyDefaultSort {
sortByFirstAggregation(data, sd.Columns)
}
return &qbtypes.ScalarData{
Columns: sd.Columns,
Data: data,
}
}
// mergeScalarData merges multiple scalar data results.
func mergeScalarData(results map[string]*qbtypes.ScalarData, applyDefaultSort bool) *qbtypes.ScalarData {
// Collect unique group columns
groupCols := []string{}
groupColMap := make(map[string]*qbtypes.ColumnDescriptor)
for _, sd := range results {
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
if _, exists := groupColMap[col.Name]; !exists {
groupColMap[col.Name] = col
groupCols = append(groupCols, col.Name)
}
}
}
}
// Build final columns
columns := []*qbtypes.ColumnDescriptor{}
// Add group columns
for _, name := range groupCols {
columns = append(columns, groupColMap[name])
}
// Add aggregation columns from each query (sorted by query name)
queryNames := make([]string, 0, len(results))
for name := range results {
queryNames = append(queryNames, name)
}
sort.Strings(queryNames)
for _, queryName := range queryNames {
sd := results[queryName]
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeAggregation {
columns = append(columns, col)
}
}
}
// Merge rows, preserve order
rowMap := make(map[string][]any)
var keyOrder []string
for _, queryName := range queryNames {
sd := results[queryName]
// Create index mappings
groupMap := make(map[string]int)
for i, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupMap[col.Name] = i
}
}
// Process each row
for _, row := range sd.Data {
key := buildKeyFromGroupCols(row, groupMap, groupCols)
if _, exists := rowMap[key]; !exists {
// Initialize new row
newRow := make([]any, len(columns))
// Set group values
for i, colName := range groupCols {
if idx, ok := groupMap[colName]; ok && idx < len(row) {
newRow[i] = row[idx]
} else {
newRow[i] = "n/a"
}
}
// Initialize all aggregations to n/a
for i := len(groupCols); i < len(columns); i++ {
newRow[i] = "n/a"
}
rowMap[key] = newRow
keyOrder = append(keyOrder, key)
}
// Set aggregation values for this query
mergedRow := rowMap[key]
colIdx := len(groupCols)
for _, col := range columns[len(groupCols):] {
if col.QueryName == queryName {
// Find the value in the original row
for i, origCol := range sd.Columns {
if origCol.Type == qbtypes.ColumnTypeAggregation &&
origCol.AggregationIndex == col.AggregationIndex {
if i < len(row) {
mergedRow[colIdx] = row[i]
}
break
}
}
}
colIdx++
}
}
}
// Convert to slice, preserving insertion order
data := make([][]any, 0, len(rowMap))
for _, key := range keyOrder {
data = append(data, rowMap[key])
}
// sort by first aggregation (descending) if no order was specified
if applyDefaultSort {
sortByFirstAggregation(data, columns)
}
return &qbtypes.ScalarData{
Columns: columns,
Data: data,
}
}
// buildRowKey builds a unique key from row values at specified indices.
func buildRowKey(row []any, indices []int) string {
parts := make([]string, len(indices))
for i, idx := range indices {
if idx < len(row) {
parts[i] = fmt.Sprintf("%v", row[idx])
} else {
parts[i] = "n/a"
}
}
return fmt.Sprintf("%v", parts)
}
// buildKeyFromGroupCols builds a key from group column values.
func buildKeyFromGroupCols(row []any, groupMap map[string]int, groupCols []string) string {
parts := make([]string, len(groupCols))
for i, colName := range groupCols {
if idx, ok := groupMap[colName]; ok && idx < len(row) {
parts[i] = fmt.Sprintf("%v", row[idx])
} else {
parts[i] = "n/a"
}
}
return fmt.Sprintf("%v", parts)
}
// sortByFirstAggregation sorts data by the first aggregation column (descending).
func sortByFirstAggregation(data [][]any, columns []*qbtypes.ColumnDescriptor) {
// Find first aggregation column
aggIdx := -1
for i, col := range columns {
if col.Type == qbtypes.ColumnTypeAggregation {
aggIdx = i
break
}
}
if aggIdx < 0 {
return
}
sort.SliceStable(data, func(i, j int) bool {
return compareValues(data[i][aggIdx], data[j][aggIdx]) > 0
})
}
// compareValues compares two values for sorting (handles n/a and numeric types).
func compareValues(a, b any) int {
// n/a values gets pushed to the end
if a == "n/a" && b == "n/a" {
return 0
}
if a == "n/a" {
return -1
}
if b == "n/a" {
return 1
}
// Compare numeric values
aFloat, aOk := toFloat64(a)
bFloat, bOk := toFloat64(b)
if aOk && bOk {
if aFloat > bFloat {
return 1
} else if aFloat < bFloat {
return -1
}
return 0
}
// Fallback to string comparison
return 0
}
// toFloat64 attempts to convert a value to float64.
func toFloat64(v any) (float64, bool) {
val := numericAsFloat(getPointerValue(v))
if math.IsNaN(val) {
return 0, false
}
return val, true
}
func gcd(a, b int64) int64 {
if b == 0 {
return a
}
return gcd(b, a%b)
}
// prepareFillZeroArgsWithStep prepares fillZero function arguments with a specific step.
func (q *querier) prepareFillZeroArgsWithStep(functions []qbtypes.Function, req *qbtypes.QueryRangeRequest, step int64) []qbtypes.Function {
needsCopy := false
for _, fn := range functions {
if fn.Name == qbtypes.FunctionNameFillZero && len(fn.Args) == 0 {
needsCopy = true
break
}
}
if !needsCopy {
return functions
}
updatedFunctions := make([]qbtypes.Function, len(functions))
copy(updatedFunctions, functions)
// funcFillZero expects start/end in milliseconds. req.Start/req.End may
// arrive in s/ms/μs/ns depending on the caller; normalize via ToNanoSecs
// (same pattern used elsewhere in the codebase, e.g. RecommendedStepInterval)
// then convert to ms. Without this, an ns payload makes (end-start)/step
// 10^6× too large and OOMs the process.
startMs := querybuilder.ToNanoSecs(req.Start) / 1_000_000
endMs := querybuilder.ToNanoSecs(req.End) / 1_000_000
for i, fn := range updatedFunctions {
if fn.Name == qbtypes.FunctionNameFillZero && len(fn.Args) == 0 {
fn.Args = []qbtypes.FunctionArg{
{Value: float64(startMs)},
{Value: float64(endMs)},
{Value: float64(step)},
}
updatedFunctions[i] = fn
}
}
return updatedFunctions
}
// calculateFormulaStep calculates the GCD of steps from queries referenced in the formula.
func (q *querier) calculateFormulaStep(expression string, req *qbtypes.QueryRangeRequest) int64 {
parsedExpr, err := govaluate.NewEvaluableExpression(expression)
if err != nil {
return 60000
}
variables := parsedExpr.Vars()
// Extract base query names (e.g., "A" from "A.0" or "A.my_alias")
queryNames := make(map[string]bool)
for _, variable := range variables {
// Split by "." to get the base query name
parts := strings.Split(variable, ".")
if len(parts) > 0 {
queryNames[parts[0]] = true
}
}
var steps []int64
for _, query := range req.CompositeQuery.Queries {
info := getqueryInfo(query.Spec)
if queryNames[info.Name] && info.Step.Duration > 0 {
stepMs := info.Step.Milliseconds()
if stepMs > 0 {
steps = append(steps, stepMs)
}
}
}
if len(steps) == 0 {
return 60000
}
// Calculate GCD of all steps
result := steps[0]
for i := 1; i < len(steps); i++ {
result = gcd(result, steps[i])
}
return result
}
// postProcessLogBody removes the "message" key from the body map when it is empty.
// Only runs for raw list queries with the use_json_body feature enabled.
func (q *querier) postProcessLogBody(ctx context.Context, orgID valuer.UUID, result *qbtypes.Result, req *qbtypes.QueryRangeRequest) *qbtypes.Result {
if req.RequestType != qbtypes.RequestTypeRaw {
return result
}
if !q.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(orgID)) {
return result
}
rawData, ok := result.Value.(*qbtypes.RawData)
if !ok {
return result
}
for _, row := range rawData.Rows {
bodyMap, ok := row.Data["body"].(map[string]any)
if !ok {
continue
}
if msg, exists := bodyMap["message"]; exists {
switch v := msg.(type) {
case string:
if v == "" {
delete(bodyMap, "message")
}
}
}
}
return result
}