|
|
|
|
@@ -15,16 +15,17 @@ import (
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
cases := []struct {
|
|
|
|
|
name string
|
|
|
|
|
requestType qbtypes.RequestType
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
|
|
|
|
|
expected qbtypes.Statement
|
|
|
|
|
expectedErr error
|
|
|
|
|
}{
|
|
|
|
|
type baseQuery struct {
|
|
|
|
|
name string
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
|
|
|
|
|
orderKey string
|
|
|
|
|
args []any
|
|
|
|
|
cte string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bases := []baseQuery{
|
|
|
|
|
{
|
|
|
|
|
name: "test_cumulative_rate_sum",
|
|
|
|
|
requestType: qbtypes.RequestTypeTimeSeries,
|
|
|
|
|
name: "cumulative_rate_sum",
|
|
|
|
|
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
|
|
|
|
Signal: telemetrytypes.SignalMetrics,
|
|
|
|
|
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
|
|
|
|
@@ -40,24 +41,16 @@ func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
Filter: &qbtypes.Filter{
|
|
|
|
|
Expression: "service.name = 'cartservice'",
|
|
|
|
|
},
|
|
|
|
|
Limit: 10,
|
|
|
|
|
GroupBy: []qbtypes.GroupByKey{
|
|
|
|
|
{
|
|
|
|
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
|
|
|
|
Name: "service.name",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
expected: qbtypes.Statement{
|
|
|
|
|
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
|
|
|
|
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
|
|
|
|
},
|
|
|
|
|
expectedErr: nil,
|
|
|
|
|
orderKey: "service.name",
|
|
|
|
|
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
|
|
|
|
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "test_cumulative_rate_sum_with_mat_column",
|
|
|
|
|
requestType: qbtypes.RequestTypeTimeSeries,
|
|
|
|
|
name: "cumulative_rate_sum_with_mat_column",
|
|
|
|
|
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
|
|
|
|
Signal: telemetrytypes.SignalMetrics,
|
|
|
|
|
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
|
|
|
|
@@ -73,24 +66,16 @@ func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
Filter: &qbtypes.Filter{
|
|
|
|
|
Expression: "materialized.key.name REGEXP 'cartservice' OR service.name = 'cartservice'",
|
|
|
|
|
},
|
|
|
|
|
Limit: 10,
|
|
|
|
|
GroupBy: []qbtypes.GroupByKey{
|
|
|
|
|
{
|
|
|
|
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
|
|
|
|
Name: "service.name",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
expected: qbtypes.Statement{
|
|
|
|
|
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
|
|
|
|
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
|
|
|
|
},
|
|
|
|
|
expectedErr: nil,
|
|
|
|
|
orderKey: "service.name",
|
|
|
|
|
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
|
|
|
|
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "test_delta_rate_sum",
|
|
|
|
|
requestType: qbtypes.RequestTypeTimeSeries,
|
|
|
|
|
name: "delta_rate_sum",
|
|
|
|
|
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
|
|
|
|
Signal: telemetrytypes.SignalMetrics,
|
|
|
|
|
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
|
|
|
|
@@ -106,24 +91,16 @@ func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
Filter: &qbtypes.Filter{
|
|
|
|
|
Expression: "service.name = 'cartservice'",
|
|
|
|
|
},
|
|
|
|
|
Limit: 10,
|
|
|
|
|
GroupBy: []qbtypes.GroupByKey{
|
|
|
|
|
{
|
|
|
|
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
|
|
|
|
Name: "service.name",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
expected: qbtypes.Statement{
|
|
|
|
|
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
|
|
|
|
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
|
|
|
|
|
},
|
|
|
|
|
expectedErr: nil,
|
|
|
|
|
orderKey: "service.name",
|
|
|
|
|
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
|
|
|
|
|
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`)",
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "test_histogram_percentile1",
|
|
|
|
|
requestType: qbtypes.RequestTypeTimeSeries,
|
|
|
|
|
name: "histogram_percentile1",
|
|
|
|
|
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
|
|
|
|
Signal: telemetrytypes.SignalMetrics,
|
|
|
|
|
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
|
|
|
|
@@ -139,24 +116,16 @@ func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
Filter: &qbtypes.Filter{
|
|
|
|
|
Expression: "service.name = 'cartservice'",
|
|
|
|
|
},
|
|
|
|
|
Limit: 10,
|
|
|
|
|
GroupBy: []qbtypes.GroupByKey{
|
|
|
|
|
{
|
|
|
|
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
|
|
|
|
Name: "service.name",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
expected: qbtypes.Statement{
|
|
|
|
|
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
|
|
|
|
|
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
|
|
|
|
|
},
|
|
|
|
|
expectedErr: nil,
|
|
|
|
|
orderKey: "service.name",
|
|
|
|
|
args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
|
|
|
|
|
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`)",
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "test_gauge_avg_sum",
|
|
|
|
|
requestType: qbtypes.RequestTypeTimeSeries,
|
|
|
|
|
name: "gauge_avg_sum",
|
|
|
|
|
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
|
|
|
|
Signal: telemetrytypes.SignalMetrics,
|
|
|
|
|
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
|
|
|
|
@@ -172,24 +141,16 @@ func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
Filter: &qbtypes.Filter{
|
|
|
|
|
Expression: "host.name = 'big-data-node-1'",
|
|
|
|
|
},
|
|
|
|
|
Limit: 10,
|
|
|
|
|
GroupBy: []qbtypes.GroupByKey{
|
|
|
|
|
{
|
|
|
|
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
|
|
|
|
Name: "host.name",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "host.name"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
expected: qbtypes.Statement{
|
|
|
|
|
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
|
|
|
|
|
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
|
|
|
|
|
},
|
|
|
|
|
expectedErr: nil,
|
|
|
|
|
orderKey: "host.name",
|
|
|
|
|
args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
|
|
|
|
|
cte: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`)",
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "test_histogram_percentile2",
|
|
|
|
|
requestType: qbtypes.RequestTypeTimeSeries,
|
|
|
|
|
name: "histogram_percentile2",
|
|
|
|
|
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
|
|
|
|
Signal: telemetrytypes.SignalMetrics,
|
|
|
|
|
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
|
|
|
|
@@ -202,23 +163,69 @@ func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Limit: 10,
|
|
|
|
|
GroupBy: []qbtypes.GroupByKey{
|
|
|
|
|
{
|
|
|
|
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
|
|
|
|
Name: "service.name",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
expected: qbtypes.Statement{
|
|
|
|
|
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
|
|
|
|
|
Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
|
|
|
|
|
},
|
|
|
|
|
expectedErr: nil,
|
|
|
|
|
orderKey: "service.name",
|
|
|
|
|
args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
|
|
|
|
|
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`)",
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type variant struct {
|
|
|
|
|
name string
|
|
|
|
|
limit int
|
|
|
|
|
hasOrder bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
variants := []variant{
|
|
|
|
|
{"with_limits", 10, false},
|
|
|
|
|
{"without_limits", 0, false},
|
|
|
|
|
{"with_order_by", 0, true},
|
|
|
|
|
{"with_order_by_and_limits", 10, true},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// expectedFinalSelects maps "base/variant" to the final SELECT portion after the CTE.
|
|
|
|
|
// The full expected query is: base.cte + expectedFinalSelects[name]
|
|
|
|
|
expectedFinalSelects := map[string]string{
|
|
|
|
|
// cumulative_rate_sum
|
|
|
|
|
"cumulative_rate_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
|
|
|
|
|
"cumulative_rate_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
|
|
|
|
"cumulative_rate_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
"cumulative_rate_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
|
|
|
|
|
// cumulative_rate_sum_with_mat_column
|
|
|
|
|
"cumulative_rate_sum_with_mat_column/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
|
|
|
|
|
"cumulative_rate_sum_with_mat_column/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
|
|
|
|
"cumulative_rate_sum_with_mat_column/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
"cumulative_rate_sum_with_mat_column/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
|
|
|
|
|
// delta_rate_sum
|
|
|
|
|
"delta_rate_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
|
|
|
|
|
"delta_rate_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
|
|
|
|
"delta_rate_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
"delta_rate_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
|
|
|
|
|
// histogram_percentile1
|
|
|
|
|
"histogram_percentile1/with_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name`, ts ASC",
|
|
|
|
|
"histogram_percentile1/without_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
|
|
|
|
"histogram_percentile1/with_order_by": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
"histogram_percentile1/with_order_by_and_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
|
|
|
|
|
// gauge_avg_sum
|
|
|
|
|
"gauge_avg_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) LIMIT 10) ORDER BY `host.name`, ts ASC",
|
|
|
|
|
"gauge_avg_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
|
|
|
|
|
"gauge_avg_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name` asc, ts ASC",
|
|
|
|
|
"gauge_avg_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT DISTINCT `host.name` FROM __spatial_aggregation_cte ORDER BY `host.name` asc LIMIT 10) ORDER BY `host.name` asc, ts ASC",
|
|
|
|
|
|
|
|
|
|
// histogram_percentile2
|
|
|
|
|
"histogram_percentile2/with_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name`, ts ASC",
|
|
|
|
|
"histogram_percentile2/without_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
|
|
|
|
"histogram_percentile2/with_order_by": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
"histogram_percentile2/with_order_by_and_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fm := NewFieldMapper()
|
|
|
|
|
cb := NewConditionBuilder(fm)
|
|
|
|
|
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
|
|
|
|
@@ -227,15 +234,13 @@ func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
t.Fatalf("failed to load field keys: %v", err)
|
|
|
|
|
}
|
|
|
|
|
mockMetadataStore.KeysMap = keys
|
|
|
|
|
// NOTE: LoadFieldKeysFromJSON doesn't set Materialized field
|
|
|
|
|
// for keys, so we have to set it manually here for testing
|
|
|
|
|
if _, ok := mockMetadataStore.KeysMap["materialized.key.name"]; ok {
|
|
|
|
|
if len(mockMetadataStore.KeysMap["materialized.key.name"]) > 0 {
|
|
|
|
|
mockMetadataStore.KeysMap["materialized.key.name"][0].Materialized = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
|
|
|
|
|
fl, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("failed to create flagger: %v", err)
|
|
|
|
|
}
|
|
|
|
|
@@ -245,23 +250,30 @@ func TestStatementBuilder(t *testing.T) {
|
|
|
|
|
mockMetadataStore,
|
|
|
|
|
fm,
|
|
|
|
|
cb,
|
|
|
|
|
flagger,
|
|
|
|
|
fl,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for _, c := range cases {
|
|
|
|
|
t.Run(c.name, func(t *testing.T) {
|
|
|
|
|
for _, b := range bases {
|
|
|
|
|
for _, v := range variants {
|
|
|
|
|
name := b.name + "/" + v.name
|
|
|
|
|
t.Run(name, func(t *testing.T) {
|
|
|
|
|
q := b.query
|
|
|
|
|
q.Limit = v.limit
|
|
|
|
|
if v.hasOrder {
|
|
|
|
|
q.Order = []qbtypes.OrderBy{
|
|
|
|
|
{
|
|
|
|
|
Key: qbtypes.OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: b.orderKey}},
|
|
|
|
|
Direction: qbtypes.OrderDirectionAsc,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
|
|
|
|
result, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, qbtypes.RequestTypeTimeSeries, q, nil)
|
|
|
|
|
|
|
|
|
|
if c.expectedErr != nil {
|
|
|
|
|
require.Error(t, err)
|
|
|
|
|
require.Contains(t, err.Error(), c.expectedErr.Error())
|
|
|
|
|
} else {
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.Equal(t, c.expected.Query, q.Query)
|
|
|
|
|
require.Equal(t, c.expected.Args, q.Args)
|
|
|
|
|
require.Equal(t, c.expected.Warnings, q.Warnings)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
require.Equal(t, b.cte+expectedFinalSelects[name], result.Query)
|
|
|
|
|
require.Equal(t, b.args, result.Args)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|