Compare commits

...

3 Commits

Author SHA1 Message Date
Naman Verma
660e454dbf chore: remove comment 2026-02-06 11:52:52 +05:30
Naman Verma
224619bb9e feat: introduce __multiple__ as a temporality 2026-02-06 11:45:42 +05:30
Naman Verma
b820cf9b81 feat: add support for simultaneous delta and cumulative temporality 2026-02-05 17:06:46 +05:30
7 changed files with 110 additions and 32 deletions

View File

@@ -269,7 +269,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
var metricTemporality map[string]metrictypes.Temporality
if len(metricNames) > 0 {
var err error
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, metricNames...)
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, req.Start, req.End, metricNames...)
if err != nil {
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
// Continue without temporality - statement builder will handle unspecified

View File

@@ -1597,12 +1597,12 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
return values, complete, nil
}
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
if metricName == "" {
return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty")
}
temporalityMap, err := t.FetchTemporalityMulti(ctx, metricName)
temporalityMap, err := t.FetchTemporalityMulti(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricName)
if err != nil {
return metrictypes.Unknown, err
}
@@ -1615,27 +1615,35 @@ func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName st
return temporality, nil
}
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
if len(metricNames) == 0 {
return make(map[string]metrictypes.Temporality), nil
}
result := make(map[string]metrictypes.Temporality)
metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...)
metricsTemporality, err := t.fetchMetricsTemporality(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
if err != nil {
return nil, err
}
// TODO: return error after table migration are run
meterMetricsTemporality, _ := t.fetchMeterSourceMetricsTemporality(ctx, metricNames...)
meterMetricsTemporality, _ := t.fetchMeterSourceMetricsTemporality(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
// For metrics not found in the database, set to Unknown
for _, metricName := range metricNames {
if temporality, exists := metricsTemporality[metricName]; exists {
result[metricName] = temporality
if temporality, exists := metricsTemporality[metricName]; exists && len(temporality) > 0 {
if len(temporality) > 1 {
result[metricName] = metrictypes.Multiple
} else {
result[metricName] = temporality[0]
}
continue
}
if temporality, exists := meterMetricsTemporality[metricName]; exists {
result[metricName] = temporality
if temporality, exists := meterMetricsTemporality[metricName]; exists && len(temporality) > 0 {
if len(temporality) > 1 {
result[metricName] = metrictypes.Multiple
} else {
result[metricName] = temporality[0]
}
continue
}
result[metricName] = metrictypes.Unknown
@@ -1644,8 +1652,8 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
return result, nil
}
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string][]metrictypes.Temporality, error) {
result := make(map[string][]metrictypes.Temporality)
// Build query to fetch temporality for all metrics
// We use attr_string_value where attr_name = '__temporality__'
@@ -1653,14 +1661,19 @@ func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metric
// and metric_name column contains temporality value, so we use the correct mapping
sb := sqlbuilder.Select(
"metric_name",
"argMax(temporality, last_reported_unix_milli) as temporality",
).From(t.metricsDBName + "." + t.metricsFieldsTblName)
"temporality",
).
Distinct().
From(t.metricsDBName + "." + t.metricsFieldsTblName)
// Filter by metric names (in the temporality column due to data mix-up)
sb.Where(sb.In("metric_name", metricNames))
sb.Where(
sb.In("metric_name", metricNames),
sb.GTE("last_reported_unix_milli", queryTimeRangeStartTs),
sb.LT("last_reported_unix_milli", queryTimeRangeEndTs),
)
// Group by metric name to get one temporality per metric
sb.GroupBy("metric_name")
sb.GroupBy("metric_name", "temporality")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
@@ -1693,25 +1706,30 @@ func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metric
temporality = metrictypes.Unknown
}
result[metricName] = temporality
result[metricName] = append(result[metricName], temporality)
}
return result, nil
}
func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string][]metrictypes.Temporality, error) {
result := make(map[string][]metrictypes.Temporality)
sb := sqlbuilder.Select(
"metric_name",
"argMax(temporality, unix_milli) as temporality",
).From(t.meterDBName + "." + t.meterFieldsTblName)
"temporality",
).
Distinct().
From(t.meterDBName + "." + t.meterFieldsTblName)
// Filter by metric names (in the temporality column due to data mix-up)
sb.Where(sb.In("metric_name", metricNames))
sb.Where(
sb.In("metric_name", metricNames),
sb.GTE("unix_milli", queryTimeRangeStartTs),
sb.LT("unix_milli", queryTimeRangeEndTs),
)
// Group by metric name to get one temporality per metric
sb.GroupBy("metric_name")
sb.GroupBy("metric_name", "temporality")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
@@ -1744,7 +1762,7 @@ func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Cont
temporality = metrictypes.Unknown
}
result[metricName] = temporality
result[metricName] = append(result[metricName], temporality)
}
return result, nil

View File

@@ -42,6 +42,7 @@ func NewMeterQueryStatementBuilder(
}
}
// todo NAMAN: add handling for the scenario where there are multiple temporalities
func (b *meterQueryStatementBuilder) Build(
ctx context.Context,
start uint64,

View File

@@ -21,6 +21,10 @@ const (
RateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
RateWithoutNegativeMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, IF(row_number() OVER rate_window = 1, nan, IF((max(value) - lagInFrame(max(value), 1, 0) OVER rate_window) < 0, max(value) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window), (max(value) - lagInFrame(max(value), 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)))) AS per_series_value`
IncreaseWithoutNegativeMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, IF(row_number() OVER rate_window = 1, nan, IF((max(value) - lagInFrame(max(value), 1, 0) OVER rate_window) < 0, max(value), (max(value) - lagInFrame(max(value), 1, 0) OVER rate_window)))) AS per_series_value`
OthersMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, %s) AS per_series_value`
RateWithInterpolation = `
CASE
WHEN row_number() OVER rate_window = 1 THEN
@@ -377,7 +381,7 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
sb.LTE("unix_milli", end),
)
if query.Aggregations[0].Temporality != metrictypes.Unknown {
if query.Aggregations[0].Temporality != metrictypes.Multiple && query.Aggregations[0].Temporality != metrictypes.Unknown {
sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
}
@@ -407,8 +411,10 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
) (string, []any, error) {
if query.Aggregations[0].Temporality == metrictypes.Delta {
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
} else if query.Aggregations[0].Temporality != metrictypes.Multiple {
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
@@ -528,6 +534,58 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
}
}
func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggForDeltaTemporality := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Delta, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggForDeltaTemporality = fmt.Sprintf("%s/%d", aggForDeltaTemporality, stepSec)
}
// todo: the interpolation scenario
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(RateWithoutNegativeMultiTemporality, aggForDeltaTemporality, start, start)
sb.SelectMore(rateExpr)
case metrictypes.TimeAggregationIncrease:
increaseExpr := fmt.Sprintf(IncreaseWithoutNegativeMultiTemporality, aggForDeltaTemporality)
sb.SelectMore(increaseExpr)
default:
aggForCumulativeTemporality := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Cumulative, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
expr := fmt.Sprintf(OthersMultiTemporality, aggForDeltaTemporality, aggForCumulativeTemporality)
sb.SelectMore(expr)
}
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LT("unix_milli", end),
)
sb.GroupBy("fingerprint", "ts", "temporality")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
queryWithoutWindow, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
queryWithWindowAndOrder := queryWithoutWindow + " WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint ASC, ts ASC) ORDER BY ts"
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", queryWithWindowAndOrder), args, nil
}
func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
_ context.Context,
_ uint64,

View File

@@ -19,6 +19,7 @@ var (
Cumulative = Temporality{valuer.NewString("cumulative")}
Unspecified = Temporality{valuer.NewString("unspecified")}
Unknown = Temporality{valuer.NewString("")}
Multiple = Temporality{valuer.NewString("__multiple__")}
)
func (t Temporality) Value() (driver.Value, error) {

View File

@@ -27,10 +27,10 @@ type MetadataStore interface {
GetAllValues(ctx context.Context, fieldValueSelector *FieldValueSelector) (*TelemetryFieldValues, bool, error)
// FetchTemporality fetches the temporality for metric
FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error)
FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error)
// FetchTemporalityMulti fetches the temporality for multiple metrics
FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error)
FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error)
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error)

View File

@@ -265,7 +265,7 @@ func (m *MockMetadataStore) SetAllValues(lookupKey string, values *telemetrytype
}
// FetchTemporality fetches the temporality for a metric
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
if temporality, exists := m.TemporalityMap[metricName]; exists {
return temporality, nil
}
@@ -273,7 +273,7 @@ func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName str
}
// FetchTemporalityMulti fetches the temporality for multiple metrics
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
for _, metricName := range metricNames {