mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-07 10:22:12 +00:00
Compare commits
3 Commits
test/uplot
...
nv/6175
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
660e454dbf | ||
|
|
224619bb9e | ||
|
|
b820cf9b81 |
@@ -269,7 +269,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
var metricTemporality map[string]metrictypes.Temporality
|
||||
if len(metricNames) > 0 {
|
||||
var err error
|
||||
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, metricNames...)
|
||||
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, req.Start, req.End, metricNames...)
|
||||
if err != nil {
|
||||
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
|
||||
// Continue without temporality - statement builder will handle unspecified
|
||||
|
||||
@@ -1597,12 +1597,12 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
|
||||
return values, complete, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
|
||||
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
|
||||
if metricName == "" {
|
||||
return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty")
|
||||
}
|
||||
|
||||
temporalityMap, err := t.FetchTemporalityMulti(ctx, metricName)
|
||||
temporalityMap, err := t.FetchTemporalityMulti(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricName)
|
||||
if err != nil {
|
||||
return metrictypes.Unknown, err
|
||||
}
|
||||
@@ -1615,27 +1615,35 @@ func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName st
|
||||
return temporality, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
if len(metricNames) == 0 {
|
||||
return make(map[string]metrictypes.Temporality), nil
|
||||
}
|
||||
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...)
|
||||
metricsTemporality, err := t.fetchMetricsTemporality(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: return error after table migration are run
|
||||
meterMetricsTemporality, _ := t.fetchMeterSourceMetricsTemporality(ctx, metricNames...)
|
||||
meterMetricsTemporality, _ := t.fetchMeterSourceMetricsTemporality(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
|
||||
|
||||
// For metrics not found in the database, set to Unknown
|
||||
for _, metricName := range metricNames {
|
||||
if temporality, exists := metricsTemporality[metricName]; exists {
|
||||
result[metricName] = temporality
|
||||
if temporality, exists := metricsTemporality[metricName]; exists && len(temporality) > 0 {
|
||||
if len(temporality) > 1 {
|
||||
result[metricName] = metrictypes.Multiple
|
||||
} else {
|
||||
result[metricName] = temporality[0]
|
||||
}
|
||||
continue
|
||||
}
|
||||
if temporality, exists := meterMetricsTemporality[metricName]; exists {
|
||||
result[metricName] = temporality
|
||||
if temporality, exists := meterMetricsTemporality[metricName]; exists && len(temporality) > 0 {
|
||||
if len(temporality) > 1 {
|
||||
result[metricName] = metrictypes.Multiple
|
||||
} else {
|
||||
result[metricName] = temporality[0]
|
||||
}
|
||||
continue
|
||||
}
|
||||
result[metricName] = metrictypes.Unknown
|
||||
@@ -1644,8 +1652,8 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string][]metrictypes.Temporality, error) {
|
||||
result := make(map[string][]metrictypes.Temporality)
|
||||
|
||||
// Build query to fetch temporality for all metrics
|
||||
// We use attr_string_value where attr_name = '__temporality__'
|
||||
@@ -1653,14 +1661,19 @@ func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metric
|
||||
// and metric_name column contains temporality value, so we use the correct mapping
|
||||
sb := sqlbuilder.Select(
|
||||
"metric_name",
|
||||
"argMax(temporality, last_reported_unix_milli) as temporality",
|
||||
).From(t.metricsDBName + "." + t.metricsFieldsTblName)
|
||||
"temporality",
|
||||
).
|
||||
Distinct().
|
||||
From(t.metricsDBName + "." + t.metricsFieldsTblName)
|
||||
|
||||
// Filter by metric names (in the temporality column due to data mix-up)
|
||||
sb.Where(sb.In("metric_name", metricNames))
|
||||
sb.Where(
|
||||
sb.In("metric_name", metricNames),
|
||||
sb.GTE("last_reported_unix_milli", queryTimeRangeStartTs),
|
||||
sb.LT("last_reported_unix_milli", queryTimeRangeEndTs),
|
||||
)
|
||||
|
||||
// Group by metric name to get one temporality per metric
|
||||
sb.GroupBy("metric_name")
|
||||
sb.GroupBy("metric_name", "temporality")
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
@@ -1693,25 +1706,30 @@ func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metric
|
||||
temporality = metrictypes.Unknown
|
||||
}
|
||||
|
||||
result[metricName] = temporality
|
||||
result[metricName] = append(result[metricName], temporality)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string][]metrictypes.Temporality, error) {
|
||||
result := make(map[string][]metrictypes.Temporality)
|
||||
|
||||
sb := sqlbuilder.Select(
|
||||
"metric_name",
|
||||
"argMax(temporality, unix_milli) as temporality",
|
||||
).From(t.meterDBName + "." + t.meterFieldsTblName)
|
||||
"temporality",
|
||||
).
|
||||
Distinct().
|
||||
From(t.meterDBName + "." + t.meterFieldsTblName)
|
||||
|
||||
// Filter by metric names (in the temporality column due to data mix-up)
|
||||
sb.Where(sb.In("metric_name", metricNames))
|
||||
sb.Where(
|
||||
sb.In("metric_name", metricNames),
|
||||
sb.GTE("unix_milli", queryTimeRangeStartTs),
|
||||
sb.LT("unix_milli", queryTimeRangeEndTs),
|
||||
)
|
||||
|
||||
// Group by metric name to get one temporality per metric
|
||||
sb.GroupBy("metric_name")
|
||||
sb.GroupBy("metric_name", "temporality")
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
@@ -1744,7 +1762,7 @@ func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Cont
|
||||
temporality = metrictypes.Unknown
|
||||
}
|
||||
|
||||
result[metricName] = temporality
|
||||
result[metricName] = append(result[metricName], temporality)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
||||
@@ -42,6 +42,7 @@ func NewMeterQueryStatementBuilder(
|
||||
}
|
||||
}
|
||||
|
||||
// todo NAMAN: add handling for the scenario where there are multiple temporalities
|
||||
func (b *meterQueryStatementBuilder) Build(
|
||||
ctx context.Context,
|
||||
start uint64,
|
||||
|
||||
@@ -21,6 +21,10 @@ const (
|
||||
RateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
|
||||
IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
|
||||
|
||||
RateWithoutNegativeMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, IF(row_number() OVER rate_window = 1, nan, IF((max(value) - lagInFrame(max(value), 1, 0) OVER rate_window) < 0, max(value) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window), (max(value) - lagInFrame(max(value), 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)))) AS per_series_value`
|
||||
IncreaseWithoutNegativeMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, IF(row_number() OVER rate_window = 1, nan, IF((max(value) - lagInFrame(max(value), 1, 0) OVER rate_window) < 0, max(value), (max(value) - lagInFrame(max(value), 1, 0) OVER rate_window)))) AS per_series_value`
|
||||
OthersMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, %s) AS per_series_value`
|
||||
|
||||
RateWithInterpolation = `
|
||||
CASE
|
||||
WHEN row_number() OVER rate_window = 1 THEN
|
||||
@@ -377,7 +381,7 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
|
||||
sb.LTE("unix_milli", end),
|
||||
)
|
||||
|
||||
if query.Aggregations[0].Temporality != metrictypes.Unknown {
|
||||
if query.Aggregations[0].Temporality != metrictypes.Multiple && query.Aggregations[0].Temporality != metrictypes.Unknown {
|
||||
sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
|
||||
}
|
||||
|
||||
@@ -407,8 +411,10 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
|
||||
) (string, []any, error) {
|
||||
if query.Aggregations[0].Temporality == metrictypes.Delta {
|
||||
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
} else if query.Aggregations[0].Temporality != metrictypes.Multiple {
|
||||
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
|
||||
@@ -528,6 +534,58 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
}
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
|
||||
_ context.Context,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
timeSeriesCTE string,
|
||||
timeSeriesCTEArgs []any,
|
||||
) (string, []any, error) {
|
||||
stepSec := int64(query.StepInterval.Seconds())
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
|
||||
stepSec,
|
||||
))
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
|
||||
aggForDeltaTemporality := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Delta, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
|
||||
aggForDeltaTemporality = fmt.Sprintf("%s/%d", aggForDeltaTemporality, stepSec)
|
||||
}
|
||||
|
||||
// todo: the interpolation scenario
|
||||
switch query.Aggregations[0].TimeAggregation {
|
||||
case metrictypes.TimeAggregationRate:
|
||||
rateExpr := fmt.Sprintf(RateWithoutNegativeMultiTemporality, aggForDeltaTemporality, start, start)
|
||||
sb.SelectMore(rateExpr)
|
||||
case metrictypes.TimeAggregationIncrease:
|
||||
increaseExpr := fmt.Sprintf(IncreaseWithoutNegativeMultiTemporality, aggForDeltaTemporality)
|
||||
sb.SelectMore(increaseExpr)
|
||||
default:
|
||||
aggForCumulativeTemporality := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Cumulative, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
expr := fmt.Sprintf(OthersMultiTemporality, aggForDeltaTemporality, aggForCumulativeTemporality)
|
||||
sb.SelectMore(expr)
|
||||
}
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
sb.Where(
|
||||
sb.In("metric_name", query.Aggregations[0].MetricName),
|
||||
sb.GTE("unix_milli", start),
|
||||
sb.LT("unix_milli", end),
|
||||
)
|
||||
sb.GroupBy("fingerprint", "ts", "temporality")
|
||||
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
queryWithoutWindow, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
|
||||
queryWithWindowAndOrder := queryWithoutWindow + " WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint ASC, ts ASC) ORDER BY ts"
|
||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", queryWithWindowAndOrder), args, nil
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
|
||||
_ context.Context,
|
||||
_ uint64,
|
||||
|
||||
@@ -19,6 +19,7 @@ var (
|
||||
Cumulative = Temporality{valuer.NewString("cumulative")}
|
||||
Unspecified = Temporality{valuer.NewString("unspecified")}
|
||||
Unknown = Temporality{valuer.NewString("")}
|
||||
Multiple = Temporality{valuer.NewString("__multiple__")}
|
||||
)
|
||||
|
||||
func (t Temporality) Value() (driver.Value, error) {
|
||||
|
||||
@@ -27,10 +27,10 @@ type MetadataStore interface {
|
||||
GetAllValues(ctx context.Context, fieldValueSelector *FieldValueSelector) (*TelemetryFieldValues, bool, error)
|
||||
|
||||
// FetchTemporality fetches the temporality for metric
|
||||
FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error)
|
||||
FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error)
|
||||
|
||||
// FetchTemporalityMulti fetches the temporality for multiple metrics
|
||||
FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error)
|
||||
FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error)
|
||||
|
||||
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
|
||||
ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error)
|
||||
|
||||
@@ -265,7 +265,7 @@ func (m *MockMetadataStore) SetAllValues(lookupKey string, values *telemetrytype
|
||||
}
|
||||
|
||||
// FetchTemporality fetches the temporality for a metric
|
||||
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
|
||||
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
|
||||
if temporality, exists := m.TemporalityMap[metricName]; exists {
|
||||
return temporality, nil
|
||||
}
|
||||
@@ -273,7 +273,7 @@ func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName str
|
||||
}
|
||||
|
||||
// FetchTemporalityMulti fetches the temporality for multiple metrics
|
||||
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
|
||||
for _, metricName := range metricNames {
|
||||
|
||||
Reference in New Issue
Block a user