|
|
|
|
@@ -26,9 +26,6 @@ const (
|
|
|
|
|
IncreaseMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, multiIf(row_number() OVER rate_window = 1, nan, (%s - lagInFrame(%s, 1) OVER rate_window) < 0, %s, (%s - lagInFrame(%s, 1) OVER rate_window))) AS per_series_value`
|
|
|
|
|
|
|
|
|
|
OthersMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, %s) AS per_series_value`
|
|
|
|
|
|
|
|
|
|
RateWithStartTs = "multiIf(row_number() OVER rate_window = 1 AND earliest_start_ts < %d, NAN, row_number() OVER rate_window = 1, sum_all_values / (ts - earliest_start_ts), earliest_start_ts = lagInFrame(latest_start_ts, 1) OVER rate_window, (sum_all_values - lagInFrame(latest_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1)), sum_all_values / (ts - lagInFrame(ts, 1))) AS per_series_value"
|
|
|
|
|
IncreaseWithStartTs = "multiIf(row_number() OVER rate_window = 1 AND earliest_start_ts < %d, NAN, row_number() OVER rate_window = 1, sum_all_values, earliest_start_ts = lagInFrame(latest_start_ts, 1) OVER rate_window, sum_all_values - lagInFrame(latest_value, 1) OVER rate_window, sum_all_values) AS per_series_value"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type MetricQueryStatementBuilder struct {
|
|
|
|
|
@@ -333,9 +330,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
|
|
|
|
|
if query.Aggregations[0].Temporality == metrictypes.Delta {
|
|
|
|
|
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
|
|
|
|
} else if query.Aggregations[0].Temporality != metrictypes.Multiple {
|
|
|
|
|
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationIncrease || query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
|
|
|
|
|
return b.buildTemporalAggCumulativeOrUnspecifiedWithStartTs(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
|
|
|
|
}
|
|
|
|
|
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
|
|
|
|
}
|
|
|
|
|
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
|
|
|
|
@@ -388,101 +382,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
|
|
|
|
|
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecifiedWithStartTs(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
start, end uint64,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
|
|
|
|
timeSeriesCTE string,
|
|
|
|
|
timeSeriesCTEArgs []any,
|
|
|
|
|
) (string, []any, error) {
|
|
|
|
|
stepSec := int64(query.StepInterval.Seconds())
|
|
|
|
|
|
|
|
|
|
moreInfoQueryBuilder := sqlbuilder.NewSelectBuilder()
|
|
|
|
|
moreInfoQueryBuilder.Select("fingerprint")
|
|
|
|
|
moreInfoQueryBuilder.SelectMore(fmt.Sprintf(
|
|
|
|
|
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
|
|
|
|
stepSec,
|
|
|
|
|
))
|
|
|
|
|
moreInfoQueryBuilder.SelectMore("toDateTime(intDiv(start_timestamp_unix_milli, 1000)) AS start_ts")
|
|
|
|
|
for _, g := range query.GroupBy {
|
|
|
|
|
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", nil, err
|
|
|
|
|
}
|
|
|
|
|
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("%s AS max_value", aggCol))
|
|
|
|
|
|
|
|
|
|
colWithLatestValue, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, metrictypes.TimeAggregationLatest, query.Aggregations[0].TableHints)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", nil, err
|
|
|
|
|
}
|
|
|
|
|
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("%s AS latest_value", colWithLatestValue))
|
|
|
|
|
moreInfoQueryBuilder.SelectMore("max(unix_milli) AS latest_timestamp")
|
|
|
|
|
|
|
|
|
|
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
|
|
|
|
moreInfoQueryBuilder.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
|
|
|
|
moreInfoQueryBuilder.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
|
|
|
|
moreInfoQueryBuilder.Where(
|
|
|
|
|
moreInfoQueryBuilder.In("metric_name", query.Aggregations[0].MetricName),
|
|
|
|
|
moreInfoQueryBuilder.GTE("unix_milli", start),
|
|
|
|
|
moreInfoQueryBuilder.LT("unix_milli", end),
|
|
|
|
|
)
|
|
|
|
|
moreInfoQueryBuilder.GroupBy("fingerprint", "ts", "start_ts")
|
|
|
|
|
moreInfoQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
|
|
|
|
|
|
|
|
moreInfoPerRowQuery, moreInfoPerRowArgs := moreInfoQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
|
|
|
|
|
|
|
|
|
|
innerQueryBuilder := sqlbuilder.NewSelectBuilder()
|
|
|
|
|
innerQueryBuilder.Select("fingerprint")
|
|
|
|
|
innerQueryBuilder.SelectMore("ts")
|
|
|
|
|
for _, g := range query.GroupBy {
|
|
|
|
|
innerQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
|
|
|
|
}
|
|
|
|
|
innerQueryBuilder.SelectMore("max(start_ts) AS latest_start_ts")
|
|
|
|
|
innerQueryBuilder.SelectMore("min(start_ts) AS earliest_start_ts")
|
|
|
|
|
innerQueryBuilder.SelectMore("sum(max_value) AS sum_all_values")
|
|
|
|
|
innerQueryBuilder.SelectMore("argMax(latest_value, latest_timestamp) AS latest_value")
|
|
|
|
|
|
|
|
|
|
innerQueryBuilder.From(fmt.Sprintf("(%s)", moreInfoPerRowQuery))
|
|
|
|
|
|
|
|
|
|
innerQueryBuilder.GroupBy("fingerprint", "ts")
|
|
|
|
|
innerQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
|
|
|
|
|
|
|
|
innerQuery, innerQueryArgs := innerQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
|
|
|
|
|
|
|
|
|
switch query.Aggregations[0].TimeAggregation {
|
|
|
|
|
case metrictypes.TimeAggregationRate:
|
|
|
|
|
rateExpr := fmt.Sprintf(RateWithStartTs, start)
|
|
|
|
|
wrapped := sqlbuilder.NewSelectBuilder()
|
|
|
|
|
wrapped.Select("ts")
|
|
|
|
|
wrapped.SelectMore("latest_value")
|
|
|
|
|
for _, g := range query.GroupBy {
|
|
|
|
|
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
|
|
|
|
}
|
|
|
|
|
wrapped.SelectMore(rateExpr)
|
|
|
|
|
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
|
|
|
|
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
|
|
|
|
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
|
|
|
|
|
|
|
|
|
case metrictypes.TimeAggregationIncrease:
|
|
|
|
|
incExpr := fmt.Sprintf(IncreaseWithStartTs, start)
|
|
|
|
|
wrapped := sqlbuilder.NewSelectBuilder()
|
|
|
|
|
wrapped.Select("ts")
|
|
|
|
|
wrapped.SelectMore("latest_value")
|
|
|
|
|
for _, g := range query.GroupBy {
|
|
|
|
|
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
|
|
|
|
}
|
|
|
|
|
wrapped.SelectMore(incExpr)
|
|
|
|
|
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
|
|
|
|
|
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
|
|
|
|
|
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
|
|
|
|
default:
|
|
|
|
|
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerQueryArgs, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
|
|
|
|
_ context.Context,
|
|
|
|
|
start, end uint64,
|
|
|
|
|
|