mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-09 18:40:26 +01:00
Compare commits
6 Commits
ns/flamegr
...
nv/5122
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7dc2836208 | ||
|
|
22acc0feb9 | ||
|
|
bb5a062ef3 | ||
|
|
b06525bac2 | ||
|
|
31fda2861a | ||
|
|
d3ffefd15a |
@@ -135,6 +135,9 @@ type seriesLookup struct {
|
||||
data map[string]map[int64]float64
|
||||
// seriesKey -> original series for metadata preservation
|
||||
seriesMetadata map[string]*TimeSeries
|
||||
// maps a variable to its series keys, letting evaluation iterate a single
|
||||
// variable's series directly.
|
||||
variableToSeriesKeys map[string][]string
|
||||
}
|
||||
|
||||
// FormulaEvaluator handles formula evaluation b/w time series from different aggregations
|
||||
@@ -291,34 +294,35 @@ func (fe *FormulaEvaluator) EvaluateFormula(timeSeriesData map[string]*TimeSerie
|
||||
// Find all unique label combinations across referenced series
|
||||
uniqueLabelSets := fe.findUniqueLabelSets(lookup)
|
||||
|
||||
// Process each unique label set
|
||||
var resultSeries []*TimeSeries
|
||||
var wg sync.WaitGroup
|
||||
// Work per label-set is cheap enough that spawning a goroutine per item
|
||||
// costs more in scheduler signaling than it saves in parallelism.
|
||||
const numWorkers = 4
|
||||
workCh := make(chan []*Label, len(uniqueLabelSets))
|
||||
resultChan := make(chan *TimeSeries, len(uniqueLabelSets))
|
||||
maxSeries := make(chan struct{}, 4)
|
||||
|
||||
// For each candidate label set, evaluate the formula expression
|
||||
// and store the result in the resultChan
|
||||
for _, labelSet := range uniqueLabelSets {
|
||||
wg.Add(1)
|
||||
go func(labels []*Label) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numWorkers)
|
||||
for range numWorkers {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
maxSeries <- struct{}{}
|
||||
defer func() { <-maxSeries }()
|
||||
|
||||
// main workhorse of the formula evaluation
|
||||
series := fe.evaluateForLabelSet(labels, lookup)
|
||||
if series != nil && len(series.Values) > 0 {
|
||||
resultChan <- series
|
||||
for labels := range workCh {
|
||||
series := fe.evaluateForLabelSet(labels, lookup)
|
||||
if series != nil && len(series.Values) > 0 {
|
||||
resultChan <- series
|
||||
}
|
||||
}
|
||||
}(labelSet)
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(resultChan)
|
||||
}()
|
||||
for _, labelSet := range uniqueLabelSets {
|
||||
workCh <- labelSet
|
||||
}
|
||||
close(workCh)
|
||||
|
||||
wg.Wait()
|
||||
close(resultChan)
|
||||
|
||||
resultSeries := make([]*TimeSeries, 0, len(uniqueLabelSets))
|
||||
for series := range resultChan {
|
||||
resultSeries = append(resultSeries, series)
|
||||
}
|
||||
@@ -340,6 +344,8 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
|
||||
// when the series is returned to the caller
|
||||
// It's also used for finding matching series for a variable
|
||||
seriesMetadata: make(map[string]*TimeSeries),
|
||||
|
||||
variableToSeriesKeys: make(map[string][]string),
|
||||
}
|
||||
|
||||
for variable, aggRef := range fe.aggRefs {
|
||||
@@ -391,6 +397,7 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
|
||||
if _, exists := lookup.data[seriesKey]; !exists {
|
||||
lookup.data[seriesKey] = make(map[int64]float64, len(series.Values))
|
||||
lookup.seriesMetadata[seriesKey] = series
|
||||
lookup.variableToSeriesKeys[variable] = append(lookup.variableToSeriesKeys[variable], seriesKey)
|
||||
}
|
||||
|
||||
// Store all timestamp-value pairs
|
||||
@@ -473,35 +480,37 @@ func (fe *FormulaEvaluator) findUniqueLabelSets(lookup *seriesLookup) [][]*Label
|
||||
|
||||
// Find unique label sets using proper label comparison
|
||||
var uniqueSets [][]*Label
|
||||
var uniqueMaps []map[string]any
|
||||
for _, labelSet := range allLabelSets {
|
||||
isUnique := true
|
||||
for _, uniqueSet := range uniqueSets {
|
||||
if fe.isSubset(uniqueSet, labelSet) {
|
||||
for _, uniqueMap := range uniqueMaps {
|
||||
if isSubset(uniqueMap, labelSet) {
|
||||
isUnique = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if isUnique {
|
||||
uniqueSets = append(uniqueSets, labelSet)
|
||||
uniqueMaps = append(uniqueMaps, labelsToMap(labelSet))
|
||||
}
|
||||
}
|
||||
|
||||
return uniqueSets
|
||||
}
|
||||
|
||||
func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
|
||||
labelMap1 := make(map[string]any)
|
||||
labelMap2 := make(map[string]any)
|
||||
|
||||
for _, label := range labels1 {
|
||||
labelMap1[label.Key.Name] = label.Value
|
||||
}
|
||||
for _, label := range labels2 {
|
||||
labelMap2[label.Key.Name] = label.Value
|
||||
func labelsToMap(labels []*Label) map[string]any {
|
||||
m := make(map[string]any, len(labels))
|
||||
for _, label := range labels {
|
||||
m[label.Key.Name] = label.Value
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
for k, v := range labelMap2 {
|
||||
if val, ok := labelMap1[k]; !ok || val != v {
|
||||
// isSubset reports whether every label in subset is present with the same value in
|
||||
// supersetMap (i.e. subset ⊆ superset).
|
||||
func isSubset(supersetMap map[string]any, subset []*Label) bool {
|
||||
for _, label := range subset {
|
||||
if val, ok := supersetMap[label.Key.Name]; !ok || val != label.Value {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -517,10 +526,14 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
||||
// for the variable
|
||||
var allTimestamps = make(map[int64]struct{})
|
||||
|
||||
// targetLabels is fixed for this call, so build its lookup once and reuse it
|
||||
// across every series comparison below.
|
||||
targetMap := labelsToMap(targetLabels)
|
||||
|
||||
for variable := range fe.aggRefs {
|
||||
// Find series with matching labels for this variable
|
||||
for seriesKey, series := range lookup.seriesMetadata {
|
||||
if strings.HasPrefix(seriesKey, variable+"|") && fe.isSubset(targetLabels, series.Labels) {
|
||||
// only this variable's series.
|
||||
for _, seriesKey := range lookup.variableToSeriesKeys[variable] {
|
||||
if isSubset(targetMap, lookup.seriesMetadata[seriesKey].Labels) {
|
||||
if timestampData, exists := lookup.data[seriesKey]; exists {
|
||||
variableData[variable] = timestampData
|
||||
// Collect all timestamps
|
||||
@@ -546,8 +559,11 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
||||
}
|
||||
slices.Sort(timestamps)
|
||||
|
||||
// Evaluate formula at each timestamp
|
||||
var resultValues []*TimeSeriesValue
|
||||
// backing slab-allocates all values in one block; resultValues holds interior
|
||||
// pointers into it. Fixed length and never appended to, so it never moves.
|
||||
backing := make([]TimeSeriesValue, len(timestamps))
|
||||
resultValues := make([]*TimeSeriesValue, 0, len(timestamps))
|
||||
n := 0
|
||||
values := fe.valuesPool.Get().(map[string]any)
|
||||
defer fe.valuesPool.Put(values)
|
||||
|
||||
@@ -592,10 +608,12 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
||||
continue
|
||||
}
|
||||
|
||||
resultValues = append(resultValues, &TimeSeriesValue{
|
||||
backing[n] = TimeSeriesValue{
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
resultValues = append(resultValues, &backing[n])
|
||||
n++
|
||||
}
|
||||
|
||||
if len(resultValues) == 0 {
|
||||
|
||||
Reference in New Issue
Block a user