Compare commits

...

6 Commits

Author SHA1 Message Date
Naman Verma
7dc2836208 perf: use a pointer free backing array for result values 2026-06-09 17:21:36 +05:30
Naman Verma
22acc0feb9 perf: switch to simple worker pool 2026-06-09 14:10:26 +05:30
Naman Verma
bb5a062ef3 Merge branch 'main' into nv/5122 2026-06-08 14:56:42 +05:30
Naman Verma
b06525bac2 Merge branch 'main' into nv/5122 2026-06-08 10:52:06 +05:30
Naman Verma
31fda2861a Merge branch 'main' into nv/5122 2026-06-05 11:12:49 +05:30
Naman Verma
d3ffefd15a perf: reuse label maps and index series by variable in formulas 2026-06-01 20:45:06 +05:30

View File

@@ -135,6 +135,9 @@ type seriesLookup struct {
data map[string]map[int64]float64
// seriesKey -> original series for metadata preservation
seriesMetadata map[string]*TimeSeries
// maps a variable to its series keys, letting evaluation iterate a single
// variable's series directly.
variableToSeriesKeys map[string][]string
}
// FormulaEvaluator handles formula evaluation b/w time series from different aggregations
@@ -291,34 +294,35 @@ func (fe *FormulaEvaluator) EvaluateFormula(timeSeriesData map[string]*TimeSerie
// Find all unique label combinations across referenced series
uniqueLabelSets := fe.findUniqueLabelSets(lookup)
// Process each unique label set
var resultSeries []*TimeSeries
var wg sync.WaitGroup
// Work per label-set is cheap enough that spawning a goroutine per item
// costs more in scheduler signaling than it saves in parallelism.
const numWorkers = 4
workCh := make(chan []*Label, len(uniqueLabelSets))
resultChan := make(chan *TimeSeries, len(uniqueLabelSets))
maxSeries := make(chan struct{}, 4)
// For each candidate label set, evaluate the formula expression
// and store the result in the resultChan
for _, labelSet := range uniqueLabelSets {
wg.Add(1)
go func(labels []*Label) {
var wg sync.WaitGroup
wg.Add(numWorkers)
for range numWorkers {
go func() {
defer wg.Done()
maxSeries <- struct{}{}
defer func() { <-maxSeries }()
// main workhorse of the formula evaluation
series := fe.evaluateForLabelSet(labels, lookup)
if series != nil && len(series.Values) > 0 {
resultChan <- series
for labels := range workCh {
series := fe.evaluateForLabelSet(labels, lookup)
if series != nil && len(series.Values) > 0 {
resultChan <- series
}
}
}(labelSet)
}()
}
go func() {
wg.Wait()
close(resultChan)
}()
for _, labelSet := range uniqueLabelSets {
workCh <- labelSet
}
close(workCh)
wg.Wait()
close(resultChan)
resultSeries := make([]*TimeSeries, 0, len(uniqueLabelSets))
for series := range resultChan {
resultSeries = append(resultSeries, series)
}
@@ -340,6 +344,8 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
// when the series is returned to the caller
// It's also used for finding matching series for a variable
seriesMetadata: make(map[string]*TimeSeries),
variableToSeriesKeys: make(map[string][]string),
}
for variable, aggRef := range fe.aggRefs {
@@ -391,6 +397,7 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
if _, exists := lookup.data[seriesKey]; !exists {
lookup.data[seriesKey] = make(map[int64]float64, len(series.Values))
lookup.seriesMetadata[seriesKey] = series
lookup.variableToSeriesKeys[variable] = append(lookup.variableToSeriesKeys[variable], seriesKey)
}
// Store all timestamp-value pairs
@@ -473,35 +480,37 @@ func (fe *FormulaEvaluator) findUniqueLabelSets(lookup *seriesLookup) [][]*Label
// Find unique label sets using proper label comparison
var uniqueSets [][]*Label
var uniqueMaps []map[string]any
for _, labelSet := range allLabelSets {
isUnique := true
for _, uniqueSet := range uniqueSets {
if fe.isSubset(uniqueSet, labelSet) {
for _, uniqueMap := range uniqueMaps {
if isSubset(uniqueMap, labelSet) {
isUnique = false
break
}
}
if isUnique {
uniqueSets = append(uniqueSets, labelSet)
uniqueMaps = append(uniqueMaps, labelsToMap(labelSet))
}
}
return uniqueSets
}
func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
labelMap1 := make(map[string]any)
labelMap2 := make(map[string]any)
for _, label := range labels1 {
labelMap1[label.Key.Name] = label.Value
}
for _, label := range labels2 {
labelMap2[label.Key.Name] = label.Value
func labelsToMap(labels []*Label) map[string]any {
m := make(map[string]any, len(labels))
for _, label := range labels {
m[label.Key.Name] = label.Value
}
return m
}
for k, v := range labelMap2 {
if val, ok := labelMap1[k]; !ok || val != v {
// isSubset reports whether every label in subset is present with the same value in
// supersetMap (i.e. subset ⊆ superset).
func isSubset(supersetMap map[string]any, subset []*Label) bool {
for _, label := range subset {
if val, ok := supersetMap[label.Key.Name]; !ok || val != label.Value {
return false
}
}
@@ -517,10 +526,14 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
// for the variable
var allTimestamps = make(map[int64]struct{})
// targetLabels is fixed for this call, so build its lookup once and reuse it
// across every series comparison below.
targetMap := labelsToMap(targetLabels)
for variable := range fe.aggRefs {
// Find series with matching labels for this variable
for seriesKey, series := range lookup.seriesMetadata {
if strings.HasPrefix(seriesKey, variable+"|") && fe.isSubset(targetLabels, series.Labels) {
// only this variable's series.
for _, seriesKey := range lookup.variableToSeriesKeys[variable] {
if isSubset(targetMap, lookup.seriesMetadata[seriesKey].Labels) {
if timestampData, exists := lookup.data[seriesKey]; exists {
variableData[variable] = timestampData
// Collect all timestamps
@@ -546,8 +559,11 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
}
slices.Sort(timestamps)
// Evaluate formula at each timestamp
var resultValues []*TimeSeriesValue
// backing slab-allocates all values in one block; resultValues holds interior
// pointers into it. Fixed length and never appended to, so it never moves.
backing := make([]TimeSeriesValue, len(timestamps))
resultValues := make([]*TimeSeriesValue, 0, len(timestamps))
n := 0
values := fe.valuesPool.Get().(map[string]any)
defer fe.valuesPool.Put(values)
@@ -592,10 +608,12 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
continue
}
resultValues = append(resultValues, &TimeSeriesValue{
backing[n] = TimeSeriesValue{
Timestamp: timestamp,
Value: value,
})
}
resultValues = append(resultValues, &backing[n])
n++
}
if len(resultValues) == 0 {