mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-01 18:10:21 +01:00
Compare commits
3 Commits
refactor/c
...
tvats-fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1cece4c43 | ||
|
|
495b2c1b25 | ||
|
|
af9d6eac80 |
@@ -179,6 +179,28 @@ func AdjustedMetricTimeRange(start, end, step uint64, mq qbtypes.QueryBuilderQue
|
||||
return start, end
|
||||
}
|
||||
|
||||
// TimeIntervalExpr returns a ClickHouse SQL expression for bucketing a DateTime column
|
||||
// into step-second intervals, aligned to an optional offset (in seconds).
|
||||
//
|
||||
// col is the DateTime column expression (e.g. "timestamp").
|
||||
// intervalExpr is the step interval literal (e.g. "INTERVAL 60 SECOND" or "toIntervalSecond(60)").
|
||||
// shiftSec is the optional shift in seconds; when 0 the expression is simply
|
||||
//
|
||||
// toStartOfInterval(<col>, <intervalExpr>)
|
||||
//
|
||||
// With a shift the bucket boundaries are aligned to the shifted time range:
|
||||
//
|
||||
// toStartOfInterval(<col> + toIntervalSecond(<shift>), <intervalExpr>) - toIntervalSecond(<shift>)
|
||||
func TimeIntervalExpr(col, intervalExpr string, shiftSec int64) string {
|
||||
if shiftSec == 0 {
|
||||
return fmt.Sprintf("toStartOfInterval(%s, %s)", col, intervalExpr)
|
||||
}
|
||||
return fmt.Sprintf(
|
||||
"toStartOfInterval(%s + toIntervalSecond(%d), %s) - toIntervalSecond(%d)",
|
||||
col, shiftSec, intervalExpr, shiftSec,
|
||||
)
|
||||
}
|
||||
|
||||
func AssignReservedVars(vars map[string]any, start, end uint64) {
|
||||
start = ToNanoSecs(start)
|
||||
end = ToNanoSecs(end)
|
||||
|
||||
@@ -358,9 +358,8 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
|
||||
cteArgs = append(cteArgs, args)
|
||||
}
|
||||
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
|
||||
int64(query.StepInterval.Seconds()),
|
||||
sb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("fromUnixTimestamp64Nano(timestamp)", fmt.Sprintf("INTERVAL %d SECOND", int64(query.StepInterval.Seconds())), query.ShiftBy),
|
||||
))
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
@@ -211,6 +211,31 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
|
||||
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
|
||||
name: "Time series with time shift",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
ShiftBy: 86400,
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp) + toIntervalSecond(86400), INTERVAL 30 SECOND) - toIntervalSecond(86400) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
|
||||
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1705397400), uint64(1705485600), "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -117,9 +117,8 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
sb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
|
||||
@@ -202,9 +201,8 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
sb.Select("fingerprint")
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
sb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
|
||||
))
|
||||
|
||||
for _, g := range query.GroupBy {
|
||||
@@ -279,9 +277,8 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
|
||||
baseSb := sqlbuilder.NewSelectBuilder()
|
||||
baseSb.Select("fingerprint")
|
||||
baseSb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
baseSb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
|
||||
|
||||
@@ -213,9 +213,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
sb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
@@ -350,9 +349,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
sb.Select("fingerprint")
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
sb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
@@ -396,9 +394,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
|
||||
baseSb := sqlbuilder.NewSelectBuilder()
|
||||
baseSb.Select("fingerprint")
|
||||
baseSb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
baseSb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
baseSb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
@@ -461,9 +458,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
|
||||
stepSec := int64(query.StepInterval.Seconds())
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
sb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
|
||||
@@ -488,9 +488,8 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
|
||||
cteArgs = append(cteArgs, args)
|
||||
}
|
||||
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(timestamp, INTERVAL %d SECOND) AS ts",
|
||||
int64(query.StepInterval.Seconds()),
|
||||
sb.SelectMore(fmt.Sprintf("%s AS ts",
|
||||
querybuilder.TimeIntervalExpr("timestamp", fmt.Sprintf("INTERVAL %d SECOND", int64(query.StepInterval.Seconds())), query.ShiftBy),
|
||||
))
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
Reference in New Issue
Block a user