mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-05 08:30:26 +01:00
Compare commits
2 Commits
issue_5123
...
nv/5122
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
31fda2861a | ||
|
|
d3ffefd15a |
@@ -86,12 +86,11 @@ func New(
|
||||
|
||||
func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
|
||||
|
||||
// Normalize Start/End to ms. UnmarshalJSON covers HTTP requests; callers
|
||||
// that build the request programmatically skip it, so this is the catch-all
|
||||
// (idempotent for the already-normalized path).
|
||||
if err := req.Normalize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Coerce the window to epoch milliseconds up front so every downstream
|
||||
// consumer (TimeRange, narrowWindowByTraceID, step interval, etc.) can
|
||||
// safely assume ms regardless of the resolution the caller sent.
|
||||
req.Start = querybuilder.ToMilliSecs(req.Start)
|
||||
req.End = querybuilder.ToMilliSecs(req.End)
|
||||
|
||||
tmplVars := req.Variables
|
||||
if tmplVars == nil {
|
||||
@@ -428,12 +427,10 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.Q
|
||||
|
||||
func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream) {
|
||||
|
||||
// Catch-all normalization for programmatic callers (see QueryRange). End is
|
||||
// 0 here for the open-ended stream, which Normalize leaves untouched.
|
||||
if err := req.Normalize(); err != nil {
|
||||
client.Error <- err
|
||||
return
|
||||
}
|
||||
// Coerce the window to epoch milliseconds up front (End may be 0 for the
|
||||
// open-ended stream, which ToMilliSecs leaves untouched).
|
||||
req.Start = querybuilder.ToMilliSecs(req.Start)
|
||||
req.End = querybuilder.ToMilliSecs(req.End)
|
||||
|
||||
event := &qbtypes.QBEvent{
|
||||
Version: "v5",
|
||||
|
||||
@@ -33,6 +33,28 @@ func ToNanoSecs(epoch uint64) uint64 {
|
||||
return temp * uint64(math.Pow(10, float64(19-count)))
|
||||
}
|
||||
|
||||
// ToMilliSecs takes an epoch whose resolution is inferred from its magnitude
|
||||
// (s/ms/µs/ns) and returns it in milliseconds. A millisecond epoch for the
|
||||
// current era has 13 digits (e.g. ~1.7e12 in 2026), so the value is scaled so
|
||||
// its digit-width matches: smaller values (seconds) are scaled up, larger ones
|
||||
// (micro/nanoseconds) are scaled down. Zero is returned unchanged.
|
||||
func ToMilliSecs(epoch uint64) uint64 {
|
||||
if epoch == 0 {
|
||||
return 0
|
||||
}
|
||||
temp := epoch
|
||||
count := 0
|
||||
for epoch != 0 {
|
||||
epoch /= 10
|
||||
count++
|
||||
}
|
||||
const msDigits = 13
|
||||
if count < msDigits {
|
||||
return temp * uint64(math.Pow(10, float64(msDigits-count)))
|
||||
}
|
||||
return temp / uint64(math.Pow(10, float64(count-msDigits)))
|
||||
}
|
||||
|
||||
// TODO(srikanthccv): should these be rounded to nearest multiple of 60 instead of 5 if step > 60?
|
||||
// That would make graph look nice but "nice" but should be less important than the usefulness.
|
||||
func RecommendedStepInterval(start, end uint64) uint64 {
|
||||
|
||||
@@ -60,3 +60,51 @@ func TestToNanoSecs(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestToMilliSecs(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
epoch uint64
|
||||
expected uint64
|
||||
}{
|
||||
{
|
||||
name: "10-digit Unix timestamp (seconds) - 2023-01-01 00:00:00 UTC",
|
||||
epoch: 1672531200, // seconds
|
||||
expected: 1672531200000, // * 10^3
|
||||
},
|
||||
{
|
||||
name: "13-digit Unix timestamp (milliseconds) - already ms",
|
||||
epoch: 1672531200000,
|
||||
expected: 1672531200000, // unchanged
|
||||
},
|
||||
{
|
||||
name: "16-digit Unix timestamp (microseconds)",
|
||||
epoch: 1672531200000000, // microseconds
|
||||
expected: 1672531200000, // / 10^3
|
||||
},
|
||||
{
|
||||
name: "19-digit Unix timestamp (nanoseconds)",
|
||||
epoch: 1672531200000000000, // nanoseconds
|
||||
expected: 1672531200000, // / 10^6
|
||||
},
|
||||
{
|
||||
name: "Unix epoch start - zero is unchanged",
|
||||
epoch: 0,
|
||||
expected: 0,
|
||||
},
|
||||
{
|
||||
name: "Recent timestamp in seconds - 2024-05-25 12:00:00 UTC",
|
||||
epoch: 1716638400,
|
||||
expected: 1716638400000,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := ToMilliSecs(tt.epoch)
|
||||
if result != tt.expected {
|
||||
t.Errorf("ToMilliSecs(%d) = %d, want %d", tt.epoch, result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +135,9 @@ type seriesLookup struct {
|
||||
data map[string]map[int64]float64
|
||||
// seriesKey -> original series for metadata preservation
|
||||
seriesMetadata map[string]*TimeSeries
|
||||
// maps a variable to its series keys, letting evaluation iterate a single
|
||||
// variable's series directly.
|
||||
variableToSeriesKeys map[string][]string
|
||||
}
|
||||
|
||||
// FormulaEvaluator handles formula evaluation b/w time series from different aggregations
|
||||
@@ -340,6 +343,8 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
|
||||
// when the series is returned to the caller
|
||||
// It's also used for finding matching series for a variable
|
||||
seriesMetadata: make(map[string]*TimeSeries),
|
||||
|
||||
variableToSeriesKeys: make(map[string][]string),
|
||||
}
|
||||
|
||||
for variable, aggRef := range fe.aggRefs {
|
||||
@@ -391,6 +396,7 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
|
||||
if _, exists := lookup.data[seriesKey]; !exists {
|
||||
lookup.data[seriesKey] = make(map[int64]float64, len(series.Values))
|
||||
lookup.seriesMetadata[seriesKey] = series
|
||||
lookup.variableToSeriesKeys[variable] = append(lookup.variableToSeriesKeys[variable], seriesKey)
|
||||
}
|
||||
|
||||
// Store all timestamp-value pairs
|
||||
@@ -473,35 +479,37 @@ func (fe *FormulaEvaluator) findUniqueLabelSets(lookup *seriesLookup) [][]*Label
|
||||
|
||||
// Find unique label sets using proper label comparison
|
||||
var uniqueSets [][]*Label
|
||||
var uniqueMaps []map[string]any
|
||||
for _, labelSet := range allLabelSets {
|
||||
isUnique := true
|
||||
for _, uniqueSet := range uniqueSets {
|
||||
if fe.isSubset(uniqueSet, labelSet) {
|
||||
for _, uniqueMap := range uniqueMaps {
|
||||
if isSubset(uniqueMap, labelSet) {
|
||||
isUnique = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if isUnique {
|
||||
uniqueSets = append(uniqueSets, labelSet)
|
||||
uniqueMaps = append(uniqueMaps, labelsToMap(labelSet))
|
||||
}
|
||||
}
|
||||
|
||||
return uniqueSets
|
||||
}
|
||||
|
||||
func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
|
||||
labelMap1 := make(map[string]any)
|
||||
labelMap2 := make(map[string]any)
|
||||
|
||||
for _, label := range labels1 {
|
||||
labelMap1[label.Key.Name] = label.Value
|
||||
}
|
||||
for _, label := range labels2 {
|
||||
labelMap2[label.Key.Name] = label.Value
|
||||
func labelsToMap(labels []*Label) map[string]any {
|
||||
m := make(map[string]any, len(labels))
|
||||
for _, label := range labels {
|
||||
m[label.Key.Name] = label.Value
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
for k, v := range labelMap2 {
|
||||
if val, ok := labelMap1[k]; !ok || val != v {
|
||||
// isSubset reports whether every label in subset is present with the same value in
|
||||
// supersetMap (i.e. subset ⊆ superset).
|
||||
func isSubset(supersetMap map[string]any, subset []*Label) bool {
|
||||
for _, label := range subset {
|
||||
if val, ok := supersetMap[label.Key.Name]; !ok || val != label.Value {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -517,10 +525,14 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
||||
// for the variable
|
||||
var allTimestamps = make(map[int64]struct{})
|
||||
|
||||
// targetLabels is fixed for this call, so build its lookup once and reuse it
|
||||
// across every series comparison below.
|
||||
targetMap := labelsToMap(targetLabels)
|
||||
|
||||
for variable := range fe.aggRefs {
|
||||
// Find series with matching labels for this variable
|
||||
for seriesKey, series := range lookup.seriesMetadata {
|
||||
if strings.HasPrefix(seriesKey, variable+"|") && fe.isSubset(targetLabels, series.Labels) {
|
||||
// only this variable's series.
|
||||
for _, seriesKey := range lookup.variableToSeriesKeys[variable] {
|
||||
if isSubset(targetMap, lookup.seriesMetadata[seriesKey].Labels) {
|
||||
if timestampData, exists := lookup.data[seriesKey]; exists {
|
||||
variableData[variable] = timestampData
|
||||
// Collect all timestamps
|
||||
|
||||
@@ -12,13 +12,6 @@ import (
|
||||
"github.com/swaggest/jsonschema-go"
|
||||
)
|
||||
|
||||
const (
|
||||
// minEpochMs and maxEpochMs bound a plausible ms timestamp to
|
||||
// 1990-01-01 .. 2100-01-01, used to reject malformed Start/End values.
|
||||
minEpochMs uint64 = 631_152_000_000
|
||||
maxEpochMs uint64 = 4_102_444_800_000
|
||||
)
|
||||
|
||||
type QueryEnvelope struct {
|
||||
// Type is the type of the query.
|
||||
Type QueryType `json:"type"` // "builder_query" | "builder_formula" | "builder_sub_query" | "builder_join" | "promql" | "clickhouse_sql"
|
||||
@@ -556,23 +549,7 @@ func (r *QueryRangeRequest) SkipFillGaps(name string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Normalize coerces Start and End to epoch milliseconds, inferring the source
|
||||
// resolution (s/ms/µs/ns) from each value's magnitude, and rejects non-zero
|
||||
// values outside the plausible 1990-2100 range. Lets downstream consumers
|
||||
// assume ms regardless of what the caller sent.
|
||||
func (r *QueryRangeRequest) Normalize() error {
|
||||
start, err := toMilliSecs(r.Start)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
end, err := toMilliSecs(r.End)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Start, r.End = start, end
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields.
|
||||
func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
|
||||
// Define a type alias to avoid infinite recursion
|
||||
type Alias QueryRangeRequest
|
||||
@@ -632,11 +609,6 @@ func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
|
||||
// Copy the decoded values back to the original struct
|
||||
*r = QueryRangeRequest(temp)
|
||||
|
||||
// Coerce Start/End to ms (and validate) at decode time for HTTP requests.
|
||||
if err := r.Normalize(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -690,24 +662,3 @@ func (r *QueryRangeRequest) GetQueriesSupportingZeroDefault() map[string]bool {
|
||||
|
||||
return canDefaultZero
|
||||
}
|
||||
|
||||
// toMilliSecs scales an epoch to milliseconds based on its magnitude: seconds are
|
||||
// scaled up, micro/nanoseconds down, milliseconds left as-is. Zero is returned
|
||||
// unchanged. A non-zero result outside 1990-2100 is rejected as malformed.
|
||||
func toMilliSecs(epoch uint64) (uint64, error) {
|
||||
var ms uint64
|
||||
switch {
|
||||
case epoch < 1e12: // seconds
|
||||
ms = epoch * 1_000
|
||||
case epoch < 1e15: // milliseconds
|
||||
ms = epoch
|
||||
case epoch < 1e18: // microseconds
|
||||
ms = epoch / 1_000
|
||||
default: // nanoseconds
|
||||
ms = epoch / 1_000_000
|
||||
}
|
||||
if epoch != 0 && (ms < minEpochMs || ms > maxEpochMs) {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "timestamp %d is outside the supported range (1990-2100)", epoch)
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
@@ -1903,70 +1903,3 @@ func TestQueryRangeRequest_StepIntervalForQuery(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryRangeRequest_Normalize(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
start uint64
|
||||
end uint64
|
||||
wantStart uint64
|
||||
wantEnd uint64
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "seconds are scaled up to ms",
|
||||
start: 1672531200, // 2023-01-01 in seconds
|
||||
end: 1716638400, // 2024-05-25 in seconds
|
||||
wantStart: 1672531200000, // * 10^3
|
||||
wantEnd: 1716638400000,
|
||||
},
|
||||
{
|
||||
name: "milliseconds pass through unchanged",
|
||||
start: 1672531200000,
|
||||
end: 1716638400000,
|
||||
wantStart: 1672531200000,
|
||||
wantEnd: 1716638400000,
|
||||
},
|
||||
{
|
||||
name: "microseconds are scaled down to ms",
|
||||
start: 1672531200000000, // µs
|
||||
end: 1716638400000000,
|
||||
wantStart: 1672531200000, // / 10^3
|
||||
wantEnd: 1716638400000,
|
||||
},
|
||||
{
|
||||
name: "nanoseconds are scaled down to ms",
|
||||
start: 1672531200000000000, // ns
|
||||
end: 1716638400000000000,
|
||||
wantStart: 1672531200000, // / 10^6
|
||||
wantEnd: 1716638400000,
|
||||
},
|
||||
{
|
||||
name: "zero end (open-ended stream) is left untouched",
|
||||
start: 1672531200000,
|
||||
end: 0,
|
||||
wantStart: 1672531200000,
|
||||
wantEnd: 0,
|
||||
},
|
||||
{
|
||||
name: "out-of-range timestamp is rejected",
|
||||
start: 5_000_000_000_000, // ~year 2128 in ms, beyond the 2100 bound
|
||||
end: 5_000_000_000_000,
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
r := &QueryRangeRequest{Start: tt.start, End: tt.end}
|
||||
err := r.Normalize()
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.wantStart, r.Start)
|
||||
assert.Equal(t, tt.wantEnd, r.End)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user