mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-05 08:30:26 +01:00
Compare commits
7 Commits
issue_5123
...
metricsExp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d64fb7ef3d | ||
|
|
16937b4093 | ||
|
|
ec13802915 | ||
|
|
ca14a46471 | ||
|
|
219f2965fe | ||
|
|
05e84b04cc | ||
|
|
def414c7bb |
@@ -219,19 +219,7 @@ func (m *module) GetStats(ctx context.Context, orgID valuer.UUID, req *metricsex
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filterWhereClause, err := m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Single query to get stats with samples, timeseries counts in required sorting order
|
||||
metricStats, total, err := m.fetchMetricsStatsWithSamples(
|
||||
ctx,
|
||||
req,
|
||||
filterWhereClause,
|
||||
false,
|
||||
req.OrderBy,
|
||||
)
|
||||
metricStats, total, err := m.fetchMetricsStatsWithSamples(ctx, req, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -974,15 +962,23 @@ func (m *module) buildFilterClause(ctx context.Context, filter *qbtypes.Filter,
|
||||
func (m *module) fetchMetricsStatsWithSamples(
|
||||
ctx context.Context,
|
||||
req *metricsexplorertypes.StatsRequest,
|
||||
filterWhereClause *sqlbuilder.WhereClause,
|
||||
normalized bool,
|
||||
orderBy *qbtypes.OrderBy,
|
||||
) ([]metricsexplorertypes.Stat, uint64, error) {
|
||||
ctx = m.withMetricsExplorerContext(ctx, "fetchMetricsStatsWithSamples")
|
||||
|
||||
hasFilter := req.Filter != nil && strings.TrimSpace(req.Filter.Expression) != ""
|
||||
var filterWhereClause *sqlbuilder.WhereClause
|
||||
if hasFilter {
|
||||
var err error
|
||||
filterWhereClause, err = m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
samplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(distributedSamplesTable)
|
||||
|
||||
// Timeseries counts per metric
|
||||
tsSB := sqlbuilder.NewSelectBuilder()
|
||||
@@ -1005,7 +1001,7 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
"metric_name",
|
||||
fmt.Sprintf("%s AS samples", countExp),
|
||||
)
|
||||
samplesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
samplesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedSamplesTable))
|
||||
samplesSB.Where(samplesSB.Between("unix_milli", req.Start, req.End))
|
||||
samplesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
|
||||
@@ -1013,6 +1009,8 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
sqlbuilder.CTEQuery("__time_series_counts").As(tsSB),
|
||||
}
|
||||
|
||||
// Narrow samples scan. With filter: fingerprint IN (per-fingerprint label preds can't fold to metric_name).
|
||||
// No filter (fast path): metric_name IN — aligns with samples table's leading sort key, orders of magnitude cheaper.
|
||||
if filterWhereClause != nil {
|
||||
fingerprintSB := sqlbuilder.NewSelectBuilder()
|
||||
fingerprintSB.Select("fingerprint")
|
||||
@@ -1025,6 +1023,15 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
|
||||
ctes = append(ctes, sqlbuilder.CTEQuery("__filtered_fingerprints").As(fingerprintSB))
|
||||
samplesSB.Where("fingerprint IN (SELECT fingerprint FROM __filtered_fingerprints)")
|
||||
} else {
|
||||
metricNamesSB := sqlbuilder.NewSelectBuilder()
|
||||
metricNamesSB.Select("DISTINCT metric_name")
|
||||
metricNamesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTsTable))
|
||||
metricNamesSB.Where(metricNamesSB.Between("unix_milli", start, end))
|
||||
metricNamesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
metricNamesSB.Where(metricNamesSB.E("__normalized", normalized))
|
||||
|
||||
samplesSB.Where(fmt.Sprintf("metric_name IN (%s)", samplesSB.Var(metricNamesSB)))
|
||||
}
|
||||
samplesSB.GroupBy("metric_name")
|
||||
|
||||
@@ -1041,7 +1048,7 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
finalSB.JoinWithOption(sqlbuilder.FullOuterJoin, "__sample_counts s", "ts.metric_name = s.metric_name")
|
||||
finalSB.Where("(COALESCE(ts.timeseries, 0) > 0 OR COALESCE(s.samples, 0) > 0)")
|
||||
|
||||
orderByColumn, orderDirection, err := getStatsOrderByColumn(orderBy)
|
||||
orderByColumn, orderDirection, err := getStatsOrderByColumn(req.OrderBy)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
@@ -86,12 +86,11 @@ func New(
|
||||
|
||||
func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
|
||||
|
||||
// Normalize Start/End to ms. UnmarshalJSON covers HTTP requests; callers
|
||||
// that build the request programmatically skip it, so this is the catch-all
|
||||
// (idempotent for the already-normalized path).
|
||||
if err := req.Normalize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Coerce the window to epoch milliseconds up front so every downstream
|
||||
// consumer (TimeRange, narrowWindowByTraceID, step interval, etc.) can
|
||||
// safely assume ms regardless of the resolution the caller sent.
|
||||
req.Start = querybuilder.ToMilliSecs(req.Start)
|
||||
req.End = querybuilder.ToMilliSecs(req.End)
|
||||
|
||||
tmplVars := req.Variables
|
||||
if tmplVars == nil {
|
||||
@@ -428,12 +427,10 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.Q
|
||||
|
||||
func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream) {
|
||||
|
||||
// Catch-all normalization for programmatic callers (see QueryRange). End is
|
||||
// 0 here for the open-ended stream, which Normalize leaves untouched.
|
||||
if err := req.Normalize(); err != nil {
|
||||
client.Error <- err
|
||||
return
|
||||
}
|
||||
// Coerce the window to epoch milliseconds up front (End may be 0 for the
|
||||
// open-ended stream, which ToMilliSecs leaves untouched).
|
||||
req.Start = querybuilder.ToMilliSecs(req.Start)
|
||||
req.End = querybuilder.ToMilliSecs(req.End)
|
||||
|
||||
event := &qbtypes.QBEvent{
|
||||
Version: "v5",
|
||||
|
||||
@@ -33,6 +33,28 @@ func ToNanoSecs(epoch uint64) uint64 {
|
||||
return temp * uint64(math.Pow(10, float64(19-count)))
|
||||
}
|
||||
|
||||
// ToMilliSecs takes an epoch whose resolution is inferred from its magnitude
|
||||
// (s/ms/µs/ns) and returns it in milliseconds. A millisecond epoch for the
|
||||
// current era has 13 digits (e.g. ~1.7e12 in 2026), so the value is scaled so
|
||||
// its digit-width matches: smaller values (seconds) are scaled up, larger ones
|
||||
// (micro/nanoseconds) are scaled down. Zero is returned unchanged.
|
||||
func ToMilliSecs(epoch uint64) uint64 {
|
||||
if epoch == 0 {
|
||||
return 0
|
||||
}
|
||||
temp := epoch
|
||||
count := 0
|
||||
for epoch != 0 {
|
||||
epoch /= 10
|
||||
count++
|
||||
}
|
||||
const msDigits = 13
|
||||
if count < msDigits {
|
||||
return temp * uint64(math.Pow(10, float64(msDigits-count)))
|
||||
}
|
||||
return temp / uint64(math.Pow(10, float64(count-msDigits)))
|
||||
}
|
||||
|
||||
// TODO(srikanthccv): should these be rounded to nearest multiple of 60 instead of 5 if step > 60?
|
||||
// That would make graph look nice but "nice" but should be less important than the usefulness.
|
||||
func RecommendedStepInterval(start, end uint64) uint64 {
|
||||
|
||||
@@ -60,3 +60,51 @@ func TestToNanoSecs(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestToMilliSecs(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
epoch uint64
|
||||
expected uint64
|
||||
}{
|
||||
{
|
||||
name: "10-digit Unix timestamp (seconds) - 2023-01-01 00:00:00 UTC",
|
||||
epoch: 1672531200, // seconds
|
||||
expected: 1672531200000, // * 10^3
|
||||
},
|
||||
{
|
||||
name: "13-digit Unix timestamp (milliseconds) - already ms",
|
||||
epoch: 1672531200000,
|
||||
expected: 1672531200000, // unchanged
|
||||
},
|
||||
{
|
||||
name: "16-digit Unix timestamp (microseconds)",
|
||||
epoch: 1672531200000000, // microseconds
|
||||
expected: 1672531200000, // / 10^3
|
||||
},
|
||||
{
|
||||
name: "19-digit Unix timestamp (nanoseconds)",
|
||||
epoch: 1672531200000000000, // nanoseconds
|
||||
expected: 1672531200000, // / 10^6
|
||||
},
|
||||
{
|
||||
name: "Unix epoch start - zero is unchanged",
|
||||
epoch: 0,
|
||||
expected: 0,
|
||||
},
|
||||
{
|
||||
name: "Recent timestamp in seconds - 2024-05-25 12:00:00 UTC",
|
||||
epoch: 1716638400,
|
||||
expected: 1716638400000,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := ToMilliSecs(tt.epoch)
|
||||
if result != tt.expected {
|
||||
t.Errorf("ToMilliSecs(%d) = %d, want %d", tt.epoch, result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,13 +12,6 @@ import (
|
||||
"github.com/swaggest/jsonschema-go"
|
||||
)
|
||||
|
||||
const (
|
||||
// minEpochMs and maxEpochMs bound a plausible ms timestamp to
|
||||
// 1990-01-01 .. 2100-01-01, used to reject malformed Start/End values.
|
||||
minEpochMs uint64 = 631_152_000_000
|
||||
maxEpochMs uint64 = 4_102_444_800_000
|
||||
)
|
||||
|
||||
type QueryEnvelope struct {
|
||||
// Type is the type of the query.
|
||||
Type QueryType `json:"type"` // "builder_query" | "builder_formula" | "builder_sub_query" | "builder_join" | "promql" | "clickhouse_sql"
|
||||
@@ -556,23 +549,7 @@ func (r *QueryRangeRequest) SkipFillGaps(name string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Normalize coerces Start and End to epoch milliseconds, inferring the source
|
||||
// resolution (s/ms/µs/ns) from each value's magnitude, and rejects non-zero
|
||||
// values outside the plausible 1990-2100 range. Lets downstream consumers
|
||||
// assume ms regardless of what the caller sent.
|
||||
func (r *QueryRangeRequest) Normalize() error {
|
||||
start, err := toMilliSecs(r.Start)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
end, err := toMilliSecs(r.End)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Start, r.End = start, end
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields.
|
||||
func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
|
||||
// Define a type alias to avoid infinite recursion
|
||||
type Alias QueryRangeRequest
|
||||
@@ -632,11 +609,6 @@ func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
|
||||
// Copy the decoded values back to the original struct
|
||||
*r = QueryRangeRequest(temp)
|
||||
|
||||
// Coerce Start/End to ms (and validate) at decode time for HTTP requests.
|
||||
if err := r.Normalize(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -690,24 +662,3 @@ func (r *QueryRangeRequest) GetQueriesSupportingZeroDefault() map[string]bool {
|
||||
|
||||
return canDefaultZero
|
||||
}
|
||||
|
||||
// toMilliSecs scales an epoch to milliseconds based on its magnitude: seconds are
|
||||
// scaled up, micro/nanoseconds down, milliseconds left as-is. Zero is returned
|
||||
// unchanged. A non-zero result outside 1990-2100 is rejected as malformed.
|
||||
func toMilliSecs(epoch uint64) (uint64, error) {
|
||||
var ms uint64
|
||||
switch {
|
||||
case epoch < 1e12: // seconds
|
||||
ms = epoch * 1_000
|
||||
case epoch < 1e15: // milliseconds
|
||||
ms = epoch
|
||||
case epoch < 1e18: // microseconds
|
||||
ms = epoch / 1_000
|
||||
default: // nanoseconds
|
||||
ms = epoch / 1_000_000
|
||||
}
|
||||
if epoch != 0 && (ms < minEpochMs || ms > maxEpochMs) {
|
||||
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "timestamp %d is outside the supported range (1990-2100)", epoch)
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
@@ -1903,70 +1903,3 @@ func TestQueryRangeRequest_StepIntervalForQuery(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryRangeRequest_Normalize(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
start uint64
|
||||
end uint64
|
||||
wantStart uint64
|
||||
wantEnd uint64
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "seconds are scaled up to ms",
|
||||
start: 1672531200, // 2023-01-01 in seconds
|
||||
end: 1716638400, // 2024-05-25 in seconds
|
||||
wantStart: 1672531200000, // * 10^3
|
||||
wantEnd: 1716638400000,
|
||||
},
|
||||
{
|
||||
name: "milliseconds pass through unchanged",
|
||||
start: 1672531200000,
|
||||
end: 1716638400000,
|
||||
wantStart: 1672531200000,
|
||||
wantEnd: 1716638400000,
|
||||
},
|
||||
{
|
||||
name: "microseconds are scaled down to ms",
|
||||
start: 1672531200000000, // µs
|
||||
end: 1716638400000000,
|
||||
wantStart: 1672531200000, // / 10^3
|
||||
wantEnd: 1716638400000,
|
||||
},
|
||||
{
|
||||
name: "nanoseconds are scaled down to ms",
|
||||
start: 1672531200000000000, // ns
|
||||
end: 1716638400000000000,
|
||||
wantStart: 1672531200000, // / 10^6
|
||||
wantEnd: 1716638400000,
|
||||
},
|
||||
{
|
||||
name: "zero end (open-ended stream) is left untouched",
|
||||
start: 1672531200000,
|
||||
end: 0,
|
||||
wantStart: 1672531200000,
|
||||
wantEnd: 0,
|
||||
},
|
||||
{
|
||||
name: "out-of-range timestamp is rejected",
|
||||
start: 5_000_000_000_000, // ~year 2128 in ms, beyond the 2100 bound
|
||||
end: 5_000_000_000_000,
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
r := &QueryRangeRequest{Start: tt.start, End: tt.end}
|
||||
err := r.Normalize()
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.wantStart, r.Start)
|
||||
assert.Equal(t, tt.wantEnd, r.End)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user