Compare commits

...

3 Commits

Author SHA1 Message Date
Naman Verma
84a7be22f7 chore: generate new openapi spec 2026-02-19 11:24:00 +05:30
Naman Verma
c5923f942f chore: fix typo in struct name 2026-02-19 11:23:38 +05:30
Naman Verma
f91eee7e50 feat: add support for count based aggregation in histogram metrics 2026-02-19 08:45:44 +05:30
6 changed files with 125 additions and 15 deletions

View File

@@ -733,7 +733,9 @@ components:
- p90
- p95
- p99
- histogram_count
type: string
MetrictypesSpaceAggregationParam: {}
MetrictypesTemporality:
enum:
- delta
@@ -1023,6 +1025,8 @@ components:
$ref: '#/components/schemas/Querybuildertypesv5ReduceTo'
spaceAggregation:
$ref: '#/components/schemas/MetrictypesSpaceAggregation'
spaceAggregationParam:
$ref: '#/components/schemas/MetrictypesSpaceAggregationParam'
temporality:
$ref: '#/components/schemas/MetrictypesTemporality'
timeAggregation:

View File

@@ -80,11 +80,12 @@ func (q *builderQuery[T]) Fingerprint() string {
case qbtypes.LogAggregation:
aggParts = append(aggParts, a.Expression)
case qbtypes.MetricAggregation:
aggParts = append(aggParts, fmt.Sprintf("%s:%s:%s:%s",
aggParts = append(aggParts, fmt.Sprintf("%s:%s:%s:%s:%s",
a.MetricName,
a.Temporality.StringValue(),
a.TimeAggregation.StringValue(),
a.SpaceAggregation.StringValue(),
a.SpaceAggregationParam.StringValue(),
))
}
}

View File

@@ -123,7 +123,7 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
origTimeAgg := query.Aggregations[0].TimeAggregation
origGroupBy := slices.Clone(query.GroupBy)
if query.Aggregations[0].SpaceAggregation.IsPercentile() &&
if (query.Aggregations[0].SpaceAggregation.IsPercentile() || query.Aggregations[0].SpaceAggregation == metrictypes.SpaceAggregationHistogramCount) &&
query.Aggregations[0].Type != metrictypes.ExpHistogramType {
// add le in the group by if doesn't exist
leExists := false
@@ -154,7 +154,11 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
}
// make the time aggregation rate and space aggregation sum
query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationRate
if query.Aggregations[0].SpaceAggregation.IsPercentile() {
query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationRate
} else {
query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationIncrease
}
query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum
}
@@ -524,7 +528,7 @@ func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
return "", nil, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"invalid space aggregation, should be one of the following: [`sum`, `avg`, `min`, `max`, `count`, `p50`, `p75`, `p90`, `p95`, `p99`]",
"invalid space aggregation, should be one of the following: [`sum`, `avg`, `min`, `max`, `count`, `p50`, `p75`, `p90`, `p95`, `p99`, `histogram_count`]",
)
}
sb := sqlbuilder.NewSelectBuilder()
@@ -577,6 +581,34 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
sb.From("__spatial_aggregation_cte")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.GroupBy("ts")
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
sb.Having(rewrittenExpr)
}
} else if query.Aggregations[0].SpaceAggregation == metrictypes.SpaceAggregationHistogramCount {
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
switch query.Aggregations[0].SpaceAggregationParam.(type) {
case metrictypes.ComparisonSpaceAggregationParam:
aggQuery, err := AggregationQueryForHistogramCount(query.Aggregations[0].SpaceAggregationParam.(metrictypes.ComparisonSpaceAggregationParam))
if err != nil {
return nil, err
}
sb.SelectMore(aggQuery)
default:
return nil, errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "no aggregation param provided for histogram count")
}
sb.From("__spatial_aggregation_cte")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.GroupBy("ts")
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)

View File

@@ -1,6 +1,7 @@
package telemetrymetrics
import (
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/errors"
@@ -308,3 +309,17 @@ func AggregationColumnForSamplesTable(
}
return aggregationColumn, nil
}
func AggregationQueryForHistogramCount(params metrictypes.ComparisonSpaceAggregationParam) (string, error) {
histogramCountLimit := params.Limit
switch params.Operater {
case "<=":
return fmt.Sprintf("argMaxIf(value, toFloat64(le), toFloat64(le) <= %f) + (argMinIf(value, toFloat64(le), toFloat64(le) > %f) - argMaxIf(value, toFloat64(le), toFloat64(le) <= %f)) * (%f - maxIf(toFloat64(le), toFloat64(le) <= %f)) / (minIf(toFloat64(le), toFloat64(le) > %f) - maxIf(toFloat64(le), toFloat64(le) <= %f)) AS value", histogramCountLimit, histogramCountLimit, histogramCountLimit, histogramCountLimit, histogramCountLimit, histogramCountLimit, histogramCountLimit), nil
case ">":
return fmt.Sprintf("argMax(value, toFloat64(le)) - (argMaxIf(value, toFloat64(le), toFloat64(le) < %f) + (argMinIf(value, toFloat64(le), toFloat64(le) >= %f) - argMaxIf(value, toFloat64(le), toFloat64(le) < %f)) * (%f - maxIf(toFloat64(le), toFloat64(le) < %f)) / (minIf(toFloat64(le), toFloat64(le) >= %f) - maxIf(toFloat64(le), toFloat64(le) <= %f))) AS value", histogramCountLimit, histogramCountLimit, histogramCountLimit, histogramCountLimit, histogramCountLimit, histogramCountLimit, histogramCountLimit), nil
default:
return "", errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid space aggregation operator, should be one of the following: [`<=`, `>`]")
}
}

View File

@@ -2,6 +2,7 @@ package metrictypes
import (
"database/sql/driver"
"fmt"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
@@ -189,17 +190,18 @@ type SpaceAggregation struct {
}
var (
SpaceAggregationUnspecified = SpaceAggregation{valuer.NewString("")}
SpaceAggregationSum = SpaceAggregation{valuer.NewString("sum")}
SpaceAggregationAvg = SpaceAggregation{valuer.NewString("avg")}
SpaceAggregationMin = SpaceAggregation{valuer.NewString("min")}
SpaceAggregationMax = SpaceAggregation{valuer.NewString("max")}
SpaceAggregationCount = SpaceAggregation{valuer.NewString("count")}
SpaceAggregationPercentile50 = SpaceAggregation{valuer.NewString("p50")}
SpaceAggregationPercentile75 = SpaceAggregation{valuer.NewString("p75")}
SpaceAggregationPercentile90 = SpaceAggregation{valuer.NewString("p90")}
SpaceAggregationPercentile95 = SpaceAggregation{valuer.NewString("p95")}
SpaceAggregationPercentile99 = SpaceAggregation{valuer.NewString("p99")}
SpaceAggregationUnspecified = SpaceAggregation{valuer.NewString("")}
SpaceAggregationSum = SpaceAggregation{valuer.NewString("sum")}
SpaceAggregationAvg = SpaceAggregation{valuer.NewString("avg")}
SpaceAggregationMin = SpaceAggregation{valuer.NewString("min")}
SpaceAggregationMax = SpaceAggregation{valuer.NewString("max")}
SpaceAggregationCount = SpaceAggregation{valuer.NewString("count")}
SpaceAggregationPercentile50 = SpaceAggregation{valuer.NewString("p50")}
SpaceAggregationPercentile75 = SpaceAggregation{valuer.NewString("p75")}
SpaceAggregationPercentile90 = SpaceAggregation{valuer.NewString("p90")}
SpaceAggregationPercentile95 = SpaceAggregation{valuer.NewString("p95")}
SpaceAggregationPercentile99 = SpaceAggregation{valuer.NewString("p99")}
SpaceAggregationHistogramCount = SpaceAggregation{valuer.NewString("histogram_count")}
)
func (SpaceAggregation) Enum() []any {
@@ -214,6 +216,7 @@ func (SpaceAggregation) Enum() []any {
SpaceAggregationPercentile90,
SpaceAggregationPercentile95,
SpaceAggregationPercentile99,
SpaceAggregationHistogramCount,
}
}
@@ -256,3 +259,22 @@ type MetricTableHints struct {
type MetricValueFilter struct {
Value float64
}
type SpaceAggregationParam interface {
StringValue() string
}
type NoSpaceAggregationParam struct{}
func (_ NoSpaceAggregationParam) StringValue() string {
return "{}"
}
type ComparisonSpaceAggregationParam struct {
Operater string `json:"operator"`
Limit float64 `json:"limit"`
}
func (cso ComparisonSpaceAggregationParam) StringValue() string {
return fmt.Sprintf("{\"operator\": \"%s\", \"limit\": \"%f\"}", cso.Operater, cso.Limit)
}

View File

@@ -446,6 +446,8 @@ type MetricAggregation struct {
TimeAggregation metrictypes.TimeAggregation `json:"timeAggregation"`
// space aggregation to apply to the query
SpaceAggregation metrictypes.SpaceAggregation `json:"spaceAggregation"`
// param for space aggregation if needed
SpaceAggregationParam metrictypes.SpaceAggregationParam `json:"spaceAggregationParam"`
// table hints to use for the query
TableHints *metrictypes.MetricTableHints `json:"-"`
// value filter to apply to the query
@@ -454,6 +456,40 @@ type MetricAggregation struct {
ReduceTo ReduceTo `json:"reduceTo,omitempty"`
}
func (m *MetricAggregation) UnmarshalJSON(data []byte) error {
type Alias MetricAggregation
aux := &struct {
SpaceAggregationParam json.RawMessage `json:"spaceAggregationParam"`
*Alias
}{
Alias: (*Alias)(m),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
// If no param provided
if len(aux.SpaceAggregationParam) == 0 || string(aux.SpaceAggregationParam) == "null" {
m.SpaceAggregationParam = metrictypes.NoSpaceAggregationParam{}
return nil
}
switch m.SpaceAggregation {
case metrictypes.SpaceAggregationHistogramCount:
var p metrictypes.ComparisonSpaceAggregationParam
if err := json.Unmarshal(aux.SpaceAggregationParam, &p); err != nil {
return err
}
m.SpaceAggregationParam = p
default:
m.SpaceAggregationParam = metrictypes.NoSpaceAggregationParam{}
}
return nil
}
// Copy creates a deep copy of MetricAggregation
func (m MetricAggregation) Copy() MetricAggregation {
c := m