|
|
|
|
@@ -135,6 +135,9 @@ type seriesLookup struct {
|
|
|
|
|
data map[string]map[int64]float64
|
|
|
|
|
// seriesKey -> original series for metadata preservation
|
|
|
|
|
seriesMetadata map[string]*TimeSeries
|
|
|
|
|
// maps a variable to its series keys, letting evaluation iterate a single
|
|
|
|
|
// variable's series directly.
|
|
|
|
|
variableToSeriesKeys map[string][]string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FormulaEvaluator handles formula evaluation b/w time series from different aggregations
|
|
|
|
|
@@ -291,34 +294,35 @@ func (fe *FormulaEvaluator) EvaluateFormula(timeSeriesData map[string]*TimeSerie
|
|
|
|
|
// Find all unique label combinations across referenced series
|
|
|
|
|
uniqueLabelSets := fe.findUniqueLabelSets(lookup)
|
|
|
|
|
|
|
|
|
|
// Process each unique label set
|
|
|
|
|
var resultSeries []*TimeSeries
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
// Work per label-set is cheap enough that spawning a goroutine per item
|
|
|
|
|
// costs more in scheduler signaling than it saves in parallelism.
|
|
|
|
|
const numWorkers = 4
|
|
|
|
|
workCh := make(chan []*Label, len(uniqueLabelSets))
|
|
|
|
|
resultChan := make(chan *TimeSeries, len(uniqueLabelSets))
|
|
|
|
|
maxSeries := make(chan struct{}, 4)
|
|
|
|
|
|
|
|
|
|
// For each candidate label set, evaluate the formula expression
|
|
|
|
|
// and store the result in the resultChan
|
|
|
|
|
for _, labelSet := range uniqueLabelSets {
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func(labels []*Label) {
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
wg.Add(numWorkers)
|
|
|
|
|
for range numWorkers {
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
maxSeries <- struct{}{}
|
|
|
|
|
defer func() { <-maxSeries }()
|
|
|
|
|
|
|
|
|
|
// main workhorse of the formula evaluation
|
|
|
|
|
series := fe.evaluateForLabelSet(labels, lookup)
|
|
|
|
|
if series != nil && len(series.Values) > 0 {
|
|
|
|
|
resultChan <- series
|
|
|
|
|
for labels := range workCh {
|
|
|
|
|
series := fe.evaluateForLabelSet(labels, lookup)
|
|
|
|
|
if series != nil && len(series.Values) > 0 {
|
|
|
|
|
resultChan <- series
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}(labelSet)
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
wg.Wait()
|
|
|
|
|
close(resultChan)
|
|
|
|
|
}()
|
|
|
|
|
for _, labelSet := range uniqueLabelSets {
|
|
|
|
|
workCh <- labelSet
|
|
|
|
|
}
|
|
|
|
|
close(workCh)
|
|
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
close(resultChan)
|
|
|
|
|
|
|
|
|
|
resultSeries := make([]*TimeSeries, 0, len(uniqueLabelSets))
|
|
|
|
|
for series := range resultChan {
|
|
|
|
|
resultSeries = append(resultSeries, series)
|
|
|
|
|
}
|
|
|
|
|
@@ -340,6 +344,8 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
|
|
|
|
|
// when the series is returned to the caller
|
|
|
|
|
// It's also used for finding matching series for a variable
|
|
|
|
|
seriesMetadata: make(map[string]*TimeSeries),
|
|
|
|
|
|
|
|
|
|
variableToSeriesKeys: make(map[string][]string),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for variable, aggRef := range fe.aggRefs {
|
|
|
|
|
@@ -391,6 +397,7 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
|
|
|
|
|
if _, exists := lookup.data[seriesKey]; !exists {
|
|
|
|
|
lookup.data[seriesKey] = make(map[int64]float64, len(series.Values))
|
|
|
|
|
lookup.seriesMetadata[seriesKey] = series
|
|
|
|
|
lookup.variableToSeriesKeys[variable] = append(lookup.variableToSeriesKeys[variable], seriesKey)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Store all timestamp-value pairs
|
|
|
|
|
@@ -473,35 +480,37 @@ func (fe *FormulaEvaluator) findUniqueLabelSets(lookup *seriesLookup) [][]*Label
|
|
|
|
|
|
|
|
|
|
// Find unique label sets using proper label comparison
|
|
|
|
|
var uniqueSets [][]*Label
|
|
|
|
|
var uniqueMaps []map[string]any
|
|
|
|
|
for _, labelSet := range allLabelSets {
|
|
|
|
|
isUnique := true
|
|
|
|
|
for _, uniqueSet := range uniqueSets {
|
|
|
|
|
if fe.isSubset(uniqueSet, labelSet) {
|
|
|
|
|
for _, uniqueMap := range uniqueMaps {
|
|
|
|
|
if isSubset(uniqueMap, labelSet) {
|
|
|
|
|
isUnique = false
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if isUnique {
|
|
|
|
|
uniqueSets = append(uniqueSets, labelSet)
|
|
|
|
|
uniqueMaps = append(uniqueMaps, labelsToMap(labelSet))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return uniqueSets
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
|
|
|
|
|
labelMap1 := make(map[string]any)
|
|
|
|
|
labelMap2 := make(map[string]any)
|
|
|
|
|
|
|
|
|
|
for _, label := range labels1 {
|
|
|
|
|
labelMap1[label.Key.Name] = label.Value
|
|
|
|
|
}
|
|
|
|
|
for _, label := range labels2 {
|
|
|
|
|
labelMap2[label.Key.Name] = label.Value
|
|
|
|
|
func labelsToMap(labels []*Label) map[string]any {
|
|
|
|
|
m := make(map[string]any, len(labels))
|
|
|
|
|
for _, label := range labels {
|
|
|
|
|
m[label.Key.Name] = label.Value
|
|
|
|
|
}
|
|
|
|
|
return m
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for k, v := range labelMap2 {
|
|
|
|
|
if val, ok := labelMap1[k]; !ok || val != v {
|
|
|
|
|
// isSubset reports whether every label in subset is present with the same value in
|
|
|
|
|
// supersetMap (i.e. subset ⊆ superset).
|
|
|
|
|
func isSubset(supersetMap map[string]any, subset []*Label) bool {
|
|
|
|
|
for _, label := range subset {
|
|
|
|
|
if val, ok := supersetMap[label.Key.Name]; !ok || val != label.Value {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -517,10 +526,14 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
|
|
|
|
// for the variable
|
|
|
|
|
var allTimestamps = make(map[int64]struct{})
|
|
|
|
|
|
|
|
|
|
// targetLabels is fixed for this call, so build its lookup once and reuse it
|
|
|
|
|
// across every series comparison below.
|
|
|
|
|
targetMap := labelsToMap(targetLabels)
|
|
|
|
|
|
|
|
|
|
for variable := range fe.aggRefs {
|
|
|
|
|
// Find series with matching labels for this variable
|
|
|
|
|
for seriesKey, series := range lookup.seriesMetadata {
|
|
|
|
|
if strings.HasPrefix(seriesKey, variable+"|") && fe.isSubset(targetLabels, series.Labels) {
|
|
|
|
|
// only this variable's series.
|
|
|
|
|
for _, seriesKey := range lookup.variableToSeriesKeys[variable] {
|
|
|
|
|
if isSubset(targetMap, lookup.seriesMetadata[seriesKey].Labels) {
|
|
|
|
|
if timestampData, exists := lookup.data[seriesKey]; exists {
|
|
|
|
|
variableData[variable] = timestampData
|
|
|
|
|
// Collect all timestamps
|
|
|
|
|
@@ -546,8 +559,11 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
|
|
|
|
}
|
|
|
|
|
slices.Sort(timestamps)
|
|
|
|
|
|
|
|
|
|
// Evaluate formula at each timestamp
|
|
|
|
|
var resultValues []*TimeSeriesValue
|
|
|
|
|
// backing slab-allocates all values in one block; resultValues holds interior
|
|
|
|
|
// pointers into it. Fixed length and never appended to, so it never moves.
|
|
|
|
|
backing := make([]TimeSeriesValue, len(timestamps))
|
|
|
|
|
resultValues := make([]*TimeSeriesValue, 0, len(timestamps))
|
|
|
|
|
n := 0
|
|
|
|
|
values := fe.valuesPool.Get().(map[string]any)
|
|
|
|
|
defer fe.valuesPool.Put(values)
|
|
|
|
|
|
|
|
|
|
@@ -592,10 +608,12 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resultValues = append(resultValues, &TimeSeriesValue{
|
|
|
|
|
backing[n] = TimeSeriesValue{
|
|
|
|
|
Timestamp: timestamp,
|
|
|
|
|
Value: value,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
resultValues = append(resultValues, &backing[n])
|
|
|
|
|
n++
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(resultValues) == 0 {
|
|
|
|
|
|