Compare commits

..

1 Commits

Author SHA1 Message Date
nityanandagohain
301d496092 chore: add normalize for QueryRangeRequest 2026-06-05 12:31:19 +05:30
6 changed files with 146 additions and 109 deletions

View File

@@ -86,11 +86,12 @@ func New(
func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
// Coerce the window to epoch milliseconds up front so every downstream
// consumer (TimeRange, narrowWindowByTraceID, step interval, etc.) can
// safely assume ms regardless of the resolution the caller sent.
req.Start = querybuilder.ToMilliSecs(req.Start)
req.End = querybuilder.ToMilliSecs(req.End)
// Normalize Start/End to ms. UnmarshalJSON covers HTTP requests; callers
// that build the request programmatically skip it, so this is the catch-all
// (idempotent for the already-normalized path).
if err := req.Normalize(); err != nil {
return nil, err
}
tmplVars := req.Variables
if tmplVars == nil {
@@ -427,10 +428,12 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.Q
func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream) {
// Coerce the window to epoch milliseconds up front (End may be 0 for the
// open-ended stream, which ToMilliSecs leaves untouched).
req.Start = querybuilder.ToMilliSecs(req.Start)
req.End = querybuilder.ToMilliSecs(req.End)
// Catch-all normalization for programmatic callers (see QueryRange). End is
// 0 here for the open-ended stream, which Normalize leaves untouched.
if err := req.Normalize(); err != nil {
client.Error <- err
return
}
event := &qbtypes.QBEvent{
Version: "v5",

View File

@@ -33,28 +33,6 @@ func ToNanoSecs(epoch uint64) uint64 {
return temp * uint64(math.Pow(10, float64(19-count)))
}
// ToMilliSecs takes an epoch whose resolution is inferred from its magnitude
// (s/ms/µs/ns) and returns it in milliseconds. A millisecond epoch for the
// current era has 13 digits (e.g. ~1.7e12 in 2026), so the value is scaled so
// its digit-width matches: smaller values (seconds) are scaled up, larger ones
// (micro/nanoseconds) are scaled down. Zero is returned unchanged.
func ToMilliSecs(epoch uint64) uint64 {
if epoch == 0 {
return 0
}
temp := epoch
count := 0
for epoch != 0 {
epoch /= 10
count++
}
const msDigits = 13
if count < msDigits {
return temp * uint64(math.Pow(10, float64(msDigits-count)))
}
return temp / uint64(math.Pow(10, float64(count-msDigits)))
}
// TODO(srikanthccv): should these be rounded to nearest multiple of 60 instead of 5 if step > 60?
// That would make graph look nice but "nice" but should be less important than the usefulness.
func RecommendedStepInterval(start, end uint64) uint64 {

View File

@@ -60,51 +60,3 @@ func TestToNanoSecs(t *testing.T) {
})
}
}
func TestToMilliSecs(t *testing.T) {
tests := []struct {
name string
epoch uint64
expected uint64
}{
{
name: "10-digit Unix timestamp (seconds) - 2023-01-01 00:00:00 UTC",
epoch: 1672531200, // seconds
expected: 1672531200000, // * 10^3
},
{
name: "13-digit Unix timestamp (milliseconds) - already ms",
epoch: 1672531200000,
expected: 1672531200000, // unchanged
},
{
name: "16-digit Unix timestamp (microseconds)",
epoch: 1672531200000000, // microseconds
expected: 1672531200000, // / 10^3
},
{
name: "19-digit Unix timestamp (nanoseconds)",
epoch: 1672531200000000000, // nanoseconds
expected: 1672531200000, // / 10^6
},
{
name: "Unix epoch start - zero is unchanged",
epoch: 0,
expected: 0,
},
{
name: "Recent timestamp in seconds - 2024-05-25 12:00:00 UTC",
epoch: 1716638400,
expected: 1716638400000,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ToMilliSecs(tt.epoch)
if result != tt.expected {
t.Errorf("ToMilliSecs(%d) = %d, want %d", tt.epoch, result, tt.expected)
}
})
}
}

View File

@@ -135,9 +135,6 @@ type seriesLookup struct {
data map[string]map[int64]float64
// seriesKey -> original series for metadata preservation
seriesMetadata map[string]*TimeSeries
// maps a variable to its series keys, letting evaluation iterate a single
// variable's series directly.
variableToSeriesKeys map[string][]string
}
// FormulaEvaluator handles formula evaluation b/w time series from different aggregations
@@ -343,8 +340,6 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
// when the series is returned to the caller
// It's also used for finding matching series for a variable
seriesMetadata: make(map[string]*TimeSeries),
variableToSeriesKeys: make(map[string][]string),
}
for variable, aggRef := range fe.aggRefs {
@@ -396,7 +391,6 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
if _, exists := lookup.data[seriesKey]; !exists {
lookup.data[seriesKey] = make(map[int64]float64, len(series.Values))
lookup.seriesMetadata[seriesKey] = series
lookup.variableToSeriesKeys[variable] = append(lookup.variableToSeriesKeys[variable], seriesKey)
}
// Store all timestamp-value pairs
@@ -479,37 +473,35 @@ func (fe *FormulaEvaluator) findUniqueLabelSets(lookup *seriesLookup) [][]*Label
// Find unique label sets using proper label comparison
var uniqueSets [][]*Label
var uniqueMaps []map[string]any
for _, labelSet := range allLabelSets {
isUnique := true
for _, uniqueMap := range uniqueMaps {
if isSubset(uniqueMap, labelSet) {
for _, uniqueSet := range uniqueSets {
if fe.isSubset(uniqueSet, labelSet) {
isUnique = false
break
}
}
if isUnique {
uniqueSets = append(uniqueSets, labelSet)
uniqueMaps = append(uniqueMaps, labelsToMap(labelSet))
}
}
return uniqueSets
}
func labelsToMap(labels []*Label) map[string]any {
m := make(map[string]any, len(labels))
for _, label := range labels {
m[label.Key.Name] = label.Value
}
return m
}
func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
labelMap1 := make(map[string]any)
labelMap2 := make(map[string]any)
// isSubset reports whether every label in subset is present with the same value in
// supersetMap (i.e. subset ⊆ superset).
func isSubset(supersetMap map[string]any, subset []*Label) bool {
for _, label := range subset {
if val, ok := supersetMap[label.Key.Name]; !ok || val != label.Value {
for _, label := range labels1 {
labelMap1[label.Key.Name] = label.Value
}
for _, label := range labels2 {
labelMap2[label.Key.Name] = label.Value
}
for k, v := range labelMap2 {
if val, ok := labelMap1[k]; !ok || val != v {
return false
}
}
@@ -525,14 +517,10 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
// for the variable
var allTimestamps = make(map[int64]struct{})
// targetLabels is fixed for this call, so build its lookup once and reuse it
// across every series comparison below.
targetMap := labelsToMap(targetLabels)
for variable := range fe.aggRefs {
// only this variable's series.
for _, seriesKey := range lookup.variableToSeriesKeys[variable] {
if isSubset(targetMap, lookup.seriesMetadata[seriesKey].Labels) {
// Find series with matching labels for this variable
for seriesKey, series := range lookup.seriesMetadata {
if strings.HasPrefix(seriesKey, variable+"|") && fe.isSubset(targetLabels, series.Labels) {
if timestampData, exists := lookup.data[seriesKey]; exists {
variableData[variable] = timestampData
// Collect all timestamps

View File

@@ -12,6 +12,13 @@ import (
"github.com/swaggest/jsonschema-go"
)
const (
// minEpochMs and maxEpochMs bound a plausible ms timestamp to
// 1990-01-01 .. 2100-01-01, used to reject malformed Start/End values.
minEpochMs uint64 = 631_152_000_000
maxEpochMs uint64 = 4_102_444_800_000
)
type QueryEnvelope struct {
// Type is the type of the query.
Type QueryType `json:"type"` // "builder_query" | "builder_formula" | "builder_sub_query" | "builder_join" | "promql" | "clickhouse_sql"
@@ -549,7 +556,23 @@ func (r *QueryRangeRequest) SkipFillGaps(name string) bool {
return false
}
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields.
// Normalize coerces Start and End to epoch milliseconds, inferring the source
// resolution (s/ms/µs/ns) from each value's magnitude, and rejects non-zero
// values outside the plausible 1990-2100 range. Lets downstream consumers
// assume ms regardless of what the caller sent.
func (r *QueryRangeRequest) Normalize() error {
start, err := toMilliSecs(r.Start)
if err != nil {
return err
}
end, err := toMilliSecs(r.End)
if err != nil {
return err
}
r.Start, r.End = start, end
return nil
}
func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
// Define a type alias to avoid infinite recursion
type Alias QueryRangeRequest
@@ -609,6 +632,11 @@ func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
// Copy the decoded values back to the original struct
*r = QueryRangeRequest(temp)
// Coerce Start/End to ms (and validate) at decode time for HTTP requests.
if err := r.Normalize(); err != nil {
return err
}
return nil
}
@@ -662,3 +690,24 @@ func (r *QueryRangeRequest) GetQueriesSupportingZeroDefault() map[string]bool {
return canDefaultZero
}
// toMilliSecs scales an epoch to milliseconds based on its magnitude: seconds are
// scaled up, micro/nanoseconds down, milliseconds left as-is. Zero is returned
// unchanged. A non-zero result outside 1990-2100 is rejected as malformed.
func toMilliSecs(epoch uint64) (uint64, error) {
var ms uint64
switch {
case epoch < 1e12: // seconds
ms = epoch * 1_000
case epoch < 1e15: // milliseconds
ms = epoch
case epoch < 1e18: // microseconds
ms = epoch / 1_000
default: // nanoseconds
ms = epoch / 1_000_000
}
if epoch != 0 && (ms < minEpochMs || ms > maxEpochMs) {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "timestamp %d is outside the supported range (1990-2100)", epoch)
}
return ms, nil
}

View File

@@ -1903,3 +1903,70 @@ func TestQueryRangeRequest_StepIntervalForQuery(t *testing.T) {
})
}
}
func TestQueryRangeRequest_Normalize(t *testing.T) {
tests := []struct {
name string
start uint64
end uint64
wantStart uint64
wantEnd uint64
wantErr bool
}{
{
name: "seconds are scaled up to ms",
start: 1672531200, // 2023-01-01 in seconds
end: 1716638400, // 2024-05-25 in seconds
wantStart: 1672531200000, // * 10^3
wantEnd: 1716638400000,
},
{
name: "milliseconds pass through unchanged",
start: 1672531200000,
end: 1716638400000,
wantStart: 1672531200000,
wantEnd: 1716638400000,
},
{
name: "microseconds are scaled down to ms",
start: 1672531200000000, // µs
end: 1716638400000000,
wantStart: 1672531200000, // / 10^3
wantEnd: 1716638400000,
},
{
name: "nanoseconds are scaled down to ms",
start: 1672531200000000000, // ns
end: 1716638400000000000,
wantStart: 1672531200000, // / 10^6
wantEnd: 1716638400000,
},
{
name: "zero end (open-ended stream) is left untouched",
start: 1672531200000,
end: 0,
wantStart: 1672531200000,
wantEnd: 0,
},
{
name: "out-of-range timestamp is rejected",
start: 5_000_000_000_000, // ~year 2128 in ms, beyond the 2100 bound
end: 5_000_000_000_000,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &QueryRangeRequest{Start: tt.start, End: tt.end}
err := r.Normalize()
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tt.wantStart, r.Start)
assert.Equal(t, tt.wantEnd, r.End)
})
}
}