Compare commits

...

3 Commits

Author SHA1 Message Date
Tushar Vats
d1cece4c43 fix: modify to not modify existing expressions 2026-03-31 01:17:48 +05:30
Tushar Vats
495b2c1b25 fix: retain same expression 2026-03-30 21:55:51 +05:30
Tushar Vats
af9d6eac80 fix: timestamp shift 2026-03-30 21:09:44 +05:30
6 changed files with 65 additions and 27 deletions

View File

@@ -179,6 +179,28 @@ func AdjustedMetricTimeRange(start, end, step uint64, mq qbtypes.QueryBuilderQue
return start, end
}
// TimeIntervalExpr returns a ClickHouse SQL expression for bucketing a DateTime column
// into step-second intervals, aligned to an optional offset (in seconds).
//
// col is the DateTime column expression (e.g. "timestamp").
// intervalExpr is the step interval literal (e.g. "INTERVAL 60 SECOND" or "toIntervalSecond(60)").
// shiftSec is the optional shift in seconds; when 0 the expression is simply
//
// toStartOfInterval(<col>, <intervalExpr>)
//
// With a shift the bucket boundaries are aligned to the shifted time range:
//
// toStartOfInterval(<col> + toIntervalSecond(<shift>), <intervalExpr>) - toIntervalSecond(<shift>)
func TimeIntervalExpr(col, intervalExpr string, shiftSec int64) string {
if shiftSec == 0 {
return fmt.Sprintf("toStartOfInterval(%s, %s)", col, intervalExpr)
}
return fmt.Sprintf(
"toStartOfInterval(%s + toIntervalSecond(%d), %s) - toIntervalSecond(%d)",
col, shiftSec, intervalExpr, shiftSec,
)
}
func AssignReservedVars(vars map[string]any, start, end uint64) {
start = ToNanoSecs(start)
end = ToNanoSecs(end)

View File

@@ -358,9 +358,8 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
cteArgs = append(cteArgs, args)
}
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
int64(query.StepInterval.Seconds()),
sb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("fromUnixTimestamp64Nano(timestamp)", fmt.Sprintf("INTERVAL %d SECOND", int64(query.StepInterval.Seconds())), query.ShiftBy),
))
var allGroupByArgs []any

View File

@@ -211,6 +211,31 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
expectedErr: nil,
},
{
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
name: "Time series with time shift",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
ShiftBy: 86400,
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp) + toIntervalSecond(86400), INTERVAL 30 SECOND) - toIntervalSecond(86400) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1705397400), uint64(1705485600), "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
},
expectedErr: nil,
},
}
ctx := context.Background()

View File

@@ -117,9 +117,8 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
sb := sqlbuilder.NewSelectBuilder()
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
sb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
@@ -202,9 +201,8 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
sb := sqlbuilder.NewSelectBuilder()
sb.Select("fingerprint")
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
sb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
))
for _, g := range query.GroupBy {
@@ -279,9 +277,8 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
baseSb := sqlbuilder.NewSelectBuilder()
baseSb.Select("fingerprint")
baseSb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
baseSb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)

View File

@@ -213,9 +213,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
sb := sqlbuilder.NewSelectBuilder()
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
sb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
))
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
@@ -350,9 +349,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
sb := sqlbuilder.NewSelectBuilder()
sb.Select("fingerprint")
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
sb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
))
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
@@ -396,9 +394,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
baseSb := sqlbuilder.NewSelectBuilder()
baseSb.Select("fingerprint")
baseSb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
baseSb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
))
for _, g := range query.GroupBy {
baseSb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
@@ -461,9 +458,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
sb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("toDateTime(intDiv(unix_milli, 1000))", fmt.Sprintf("toIntervalSecond(%d)", stepSec), query.ShiftBy),
))
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))

View File

@@ -488,9 +488,8 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
cteArgs = append(cteArgs, args)
}
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(timestamp, INTERVAL %d SECOND) AS ts",
int64(query.StepInterval.Seconds()),
sb.SelectMore(fmt.Sprintf("%s AS ts",
querybuilder.TimeIntervalExpr("timestamp", fmt.Sprintf("INTERVAL %d SECOND", int64(query.StepInterval.Seconds())), query.ShiftBy),
))
var allGroupByArgs []any