mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-24 17:23:19 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
860c5e42cb | ||
|
|
7e6ce8dea2 | ||
|
|
c948b04fa5 | ||
|
|
d6ba98148a | ||
|
|
953b9e8216 | ||
|
|
212f5f4120 |
@@ -26,6 +26,9 @@ const (
|
||||
IncreaseMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, multiIf(row_number() OVER rate_window = 1, nan, (%s - lagInFrame(%s, 1) OVER rate_window) < 0, %s, (%s - lagInFrame(%s, 1) OVER rate_window))) AS per_series_value`
|
||||
|
||||
OthersMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, %s) AS per_series_value`
|
||||
|
||||
RateWithStartTs = "multiIf(row_number() OVER rate_window = 1 AND earliest_start_ts < %d, NAN, row_number() OVER rate_window = 1, sum_all_values / (ts - earliest_start_ts), earliest_start_ts = lagInFrame(latest_start_ts, 1) OVER rate_window, (sum_all_values - lagInFrame(latest_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window), sum_all_values / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value"
|
||||
IncreaseWithStartTs = "multiIf(row_number() OVER rate_window = 1 AND earliest_start_ts < %d, NAN, row_number() OVER rate_window = 1, sum_all_values, earliest_start_ts = lagInFrame(latest_start_ts, 1) OVER rate_window, sum_all_values - lagInFrame(latest_value, 1) OVER rate_window, sum_all_values) AS per_series_value"
|
||||
)
|
||||
|
||||
type MetricQueryStatementBuilder struct {
|
||||
@@ -330,6 +333,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
|
||||
if query.Aggregations[0].Temporality == metrictypes.Delta {
|
||||
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
} else if query.Aggregations[0].Temporality != metrictypes.Multiple {
|
||||
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationIncrease || query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
|
||||
return b.buildTemporalAggCumulativeOrUnspecifiedWithStartTs(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
@@ -382,6 +388,115 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecifiedWithStartTs(
|
||||
ctx context.Context,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
timeSeriesCTE string,
|
||||
timeSeriesCTEArgs []any,
|
||||
) (string, []any, error) {
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if tbl == SamplesV4TableName {
|
||||
return b.buildTemporalAggCumulativeOrUnspecifiedWithStartTsForSamplesTbl(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecifiedWithStartTsForSamplesTbl(
|
||||
_ context.Context,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
timeSeriesCTE string,
|
||||
timeSeriesCTEArgs []any,
|
||||
) (string, []any, error) {
|
||||
stepSec := int64(query.StepInterval.Seconds())
|
||||
|
||||
moreInfoQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
moreInfoQueryBuilder.Select("fingerprint")
|
||||
moreInfoQueryBuilder.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
))
|
||||
moreInfoQueryBuilder.SelectMore("toDateTime(intDiv(start_timestamp_unix_milli, 1000)) AS start_ts")
|
||||
for _, g := range query.GroupBy {
|
||||
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
|
||||
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("%s AS max_value", aggCol))
|
||||
|
||||
colWithLatestValue, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, metrictypes.TimeAggregationLatest, query.Aggregations[0].TableHints)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("%s AS latest_value", colWithLatestValue))
|
||||
moreInfoQueryBuilder.SelectMore("max(unix_milli) AS latest_timestamp")
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
moreInfoQueryBuilder.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
moreInfoQueryBuilder.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
moreInfoQueryBuilder.Where(
|
||||
moreInfoQueryBuilder.In("metric_name", query.Aggregations[0].MetricName),
|
||||
moreInfoQueryBuilder.GTE("unix_milli", start),
|
||||
moreInfoQueryBuilder.LT("unix_milli", end),
|
||||
)
|
||||
moreInfoQueryBuilder.GroupBy("fingerprint", "ts", "start_ts")
|
||||
moreInfoQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
|
||||
moreInfoPerRowQuery, moreInfoPerRowArgs := moreInfoQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
|
||||
|
||||
innerQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
innerQueryBuilder.Select("fingerprint")
|
||||
innerQueryBuilder.SelectMore("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
innerQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
innerQueryBuilder.SelectMore("max(start_ts) AS latest_start_ts")
|
||||
innerQueryBuilder.SelectMore("min(start_ts) AS earliest_start_ts")
|
||||
innerQueryBuilder.SelectMore("sum(max_value) AS sum_all_values")
|
||||
innerQueryBuilder.SelectMore("argMax(latest_value, latest_timestamp) AS latest_value")
|
||||
|
||||
innerQueryBuilder.From(fmt.Sprintf("(%s)", moreInfoPerRowQuery))
|
||||
|
||||
innerQueryBuilder.GroupBy("fingerprint", "ts")
|
||||
innerQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
|
||||
innerQuery, innerQueryArgs := innerQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
||||
|
||||
switch query.Aggregations[0].TimeAggregation {
|
||||
case metrictypes.TimeAggregationRate:
|
||||
rateExpr := fmt.Sprintf(RateWithStartTs, start)
|
||||
wrapped := sqlbuilder.NewSelectBuilder()
|
||||
wrapped.Select("ts")
|
||||
wrapped.SelectMore("latest_value")
|
||||
for _, g := range query.GroupBy {
|
||||
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
wrapped.SelectMore(rateExpr)
|
||||
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
||||
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
||||
|
||||
case metrictypes.TimeAggregationIncrease:
|
||||
incExpr := fmt.Sprintf(IncreaseWithStartTs, start)
|
||||
wrapped := sqlbuilder.NewSelectBuilder()
|
||||
wrapped.Select("ts")
|
||||
wrapped.SelectMore("latest_value")
|
||||
for _, g := range query.GroupBy {
|
||||
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
wrapped.SelectMore(incExpr)
|
||||
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
||||
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
||||
default:
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerQueryArgs, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
_ context.Context,
|
||||
start, end uint64,
|
||||
@@ -389,6 +504,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
timeSeriesCTE string,
|
||||
timeSeriesCTEArgs []any,
|
||||
) (string, []any, error) {
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
stepSec := int64(query.StepInterval.Seconds())
|
||||
|
||||
baseSb := sqlbuilder.NewSelectBuilder()
|
||||
@@ -400,6 +516,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
for _, g := range query.GroupBy {
|
||||
baseSb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
if tbl != SamplesV4TableName {
|
||||
baseSb.SelectMore("uniqMerge(num_start_timestamps) AS num_start_timestamps")
|
||||
}
|
||||
|
||||
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if err != nil {
|
||||
@@ -407,7 +526,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
}
|
||||
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
baseSb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
baseSb.Where(
|
||||
@@ -428,6 +546,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
for _, g := range query.GroupBy {
|
||||
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
if tbl != SamplesV4TableName {
|
||||
wrapped.SelectMore("num_start_timestamps > 1 AS has_restarts")
|
||||
}
|
||||
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", RateTmpl))
|
||||
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
||||
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
|
||||
@@ -439,6 +560,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
for _, g := range query.GroupBy {
|
||||
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
if tbl != SamplesV4TableName {
|
||||
wrapped.SelectMore("num_start_timestamps > 1 AS has_restarts")
|
||||
}
|
||||
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", IncreaseTmpl))
|
||||
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
||||
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
|
||||
@@ -515,11 +639,12 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
|
||||
_ context.Context,
|
||||
_ uint64,
|
||||
_ uint64,
|
||||
start uint64,
|
||||
end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
_ map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, []any, error) {
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if query.Aggregations[0].SpaceAggregation.IsZero() {
|
||||
return "", nil, errors.Newf(
|
||||
errors.TypeInvalidInput,
|
||||
@@ -534,6 +659,9 @@ func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue()))
|
||||
if tbl != SamplesV4TableName {
|
||||
sb.SelectMore("max(has_restarts) AS has_restarts")
|
||||
}
|
||||
sb.From("__temporal_aggregation_cte")
|
||||
sb.Where(sb.EQ("isNaN(per_series_value)", 0))
|
||||
if query.Aggregations[0].ValueFilter != nil {
|
||||
|
||||
Reference in New Issue
Block a user