Compare commits

...

3 Commits

Author SHA1 Message Date
srikanthccv
6facb2c418 chore: temp commit 2026-06-06 17:29:24 +05:30
Naman Verma
31fda2861a Merge branch 'main' into nv/5122 2026-06-05 11:12:49 +05:30
Naman Verma
d3ffefd15a perf: reuse label maps and index series by variable in formulas 2026-06-01 20:45:06 +05:30
2 changed files with 732 additions and 171 deletions

View File

@@ -3,9 +3,11 @@ package querybuildertypesv5
import (
"fmt"
"math"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"slices"
@@ -129,12 +131,35 @@ type aggregationRef struct {
Alias *string // Alias-based reference (e.g., A.my_alias)
}
// seriesLookup provides lookup for series data.
// seriesEntry holds one series' values and its canonical label signature so that
// matching can be a map lookup instead of rebuilding maps for every comparison.
type seriesEntry struct {
labels []*Label
keys []string // sorted label key names
sig string // canonical value signature over keys
data map[int64]float64 // timestamp -> value
}
// variableLookup indexes a single variable's (one aggregation's) series.
type variableLookup struct {
// keys is the sorted set of label key names shared by this variable's series
// (taken from the first series). Only meaningful when regular is true.
keys []string
// regular is true when every series shares the same set of label keys, which
// is the normal case for a group-by. It lets matching be an O(1) signature
// lookup instead of an O(n) subset scan.
regular bool
// entries holds the series in bucket order; used for the irregular fallback.
entries []*seriesEntry
// bySig maps a value signature to the first series (in bucket order) carrying
// it. It both drives regular-case matching and enumerates the variable's
// distinct label sets for findUniqueLabelSets.
bySig map[string]*seriesEntry
}
// seriesLookup provides lookup for series data, grouped by variable.
type seriesLookup struct {
// seriesKey -> timestamp -> value
data map[string]map[int64]float64
// seriesKey -> original series for metadata preservation
seriesMetadata map[string]*TimeSeries
byVariable map[string]*variableLookup
}
// FormulaEvaluator handles formula evaluation b/w time series from different aggregations
@@ -190,6 +215,12 @@ type FormulaEvaluator struct {
timestampPool sync.Pool
valuesPool sync.Pool
// tsSetPool holds map[int64]struct{} scratch maps used to build the union of
// timestamps for a label set.
tsSetPool sync.Pool
// matchedPool holds []map[int64]float64 scratch slices indexed by variable
// position, holding each variable's matched series data for a label set.
matchedPool sync.Pool
}
// NewFormulaEvaluator creates a formula evaluator.
@@ -243,6 +274,14 @@ func NewFormulaEvaluator(expressionStr string, canDefaultZero map[string]bool) (
evaluator.valuesPool.New = func() any {
return make(map[string]any, len(evaluator.variables))
}
evaluator.tsSetPool.New = func() any {
return make(map[int64]struct{}, 512)
}
nVars := len(evaluator.variables)
evaluator.matchedPool.New = func() any {
s := make([]map[int64]float64, 0, nVars)
return &s
}
return evaluator, nil
}
@@ -291,55 +330,62 @@ func (fe *FormulaEvaluator) EvaluateFormula(timeSeriesData map[string]*TimeSerie
// Find all unique label combinations across referenced series
uniqueLabelSets := fe.findUniqueLabelSets(lookup)
// Process each unique label set
var resultSeries []*TimeSeries
var wg sync.WaitGroup
resultChan := make(chan *TimeSeries, len(uniqueLabelSets))
maxSeries := make(chan struct{}, 4)
// For each candidate label set, evaluate the formula expression
// and store the result in the resultChan
for _, labelSet := range uniqueLabelSets {
wg.Add(1)
go func(labels []*Label) {
defer wg.Done()
maxSeries <- struct{}{}
defer func() { <-maxSeries }()
// main workhorse of the formula evaluation
series := fe.evaluateForLabelSet(labels, lookup)
if series != nil && len(series.Values) > 0 {
resultChan <- series
}
}(labelSet)
n := len(uniqueLabelSets)
if n == 0 {
return nil, nil
}
go func() {
wg.Wait()
close(resultChan)
}()
// Each label set is evaluated independently into its own slot, so we can write
// results without any synchronization and compact afterwards. This avoids the
// per-label-set goroutine + semaphore + channel churn that previously
// dominated the profile (runtime.pthread_cond_signal).
results := make([]*TimeSeries, n)
for series := range resultChan {
resultSeries = append(resultSeries, series)
// For small inputs the goroutine machinery costs more than it saves.
const parallelThreshold = 8
if n < parallelThreshold {
for i, labelSet := range uniqueLabelSets {
results[i] = fe.evaluateForLabelSet(labelSet, lookup)
}
} else {
workers := min(runtime.GOMAXPROCS(0), n)
// A shared cursor hands out label-set indices to a fixed pool of workers,
// balancing load (series sizes vary) with a single atomic add per item.
var next atomic.Int64
var wg sync.WaitGroup
wg.Add(workers)
for range workers {
go func() {
defer wg.Done()
for {
i := int(next.Add(1)) - 1
if i >= n {
return
}
results[i] = fe.evaluateForLabelSet(uniqueLabelSets[i], lookup)
}
}()
}
wg.Wait()
}
// Compact non-empty results in place (results[j] is read before it is
// overwritten, since j <= i throughout).
resultSeries := results[:0]
for _, series := range results {
if series != nil && len(series.Values) > 0 {
resultSeries = append(resultSeries, series)
}
}
return resultSeries, nil
}
// buildSeriesLookup creates lookup structure for all referenced aggregations.
// buildSeriesLookup creates the per-variable matching index for all referenced
// aggregations.
func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSeriesData) *seriesLookup {
lookup := &seriesLookup{
// data is a map of series key to timestamp to value
// series key is a unique identifier for a series
// timestamp is the timestamp of the value
// value is the value of the series at the timestamp
data: make(map[string]map[int64]float64),
// seriesMetadata is a map of series key to series metadata
// series metadata is the metadata of the series
// this is used to preserve the original label structure and metadata
// when the series is returned to the caller
// It's also used for finding matching series for a variable
seriesMetadata: make(map[string]*TimeSeries),
byVariable: make(map[string]*variableLookup, len(fe.aggRefs)),
}
for variable, aggRef := range fe.aggRefs {
@@ -360,13 +406,10 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
continue
}
// Find the specific aggregation bucket
// Now, that we have the data for the query, we look for the specific aggregation bucket
// referenced in the formula expression.
// For example, if the formula expression is `B.2`, the above `data` would be the
// time series data for the query B.
// The following code will find the aggregation at the index 2
// so we can build the series key -> timestamp -> value map for the expr evaluation
// Find the specific aggregation bucket referenced in the formula
// expression. For example, if the expression is `B.2`, the above `data`
// is the time series data for query B and we pick the aggregation at
// index 2.
var targetBucket *AggregationBucket
for _, bucket := range data.Aggregations {
if aggRef.Index != nil && bucket.Index == *aggRef.Index {
@@ -383,63 +426,126 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
continue
}
// Process all series in the target bucket
for seriesIdx, series := range targetBucket.Series {
seriesKey := fe.buildSeriesKey(variable, seriesIdx, series.Labels)
vl := &variableLookup{
regular: true,
entries: make([]*seriesEntry, 0, len(targetBucket.Series)),
bySig: make(map[string]*seriesEntry, len(targetBucket.Series)),
}
// Initialize timestamp map
if _, exists := lookup.data[seriesKey]; !exists {
lookup.data[seriesKey] = make(map[int64]float64, len(series.Values))
lookup.seriesMetadata[seriesKey] = series
// Process all series in the target bucket.
for seriesIdx, series := range targetBucket.Series {
keys, sig := labelKeysAndSig(series.Labels)
if seriesIdx == 0 {
vl.keys = keys
} else if vl.regular && !slices.Equal(keys, vl.keys) {
// A series with a different key set means we can no longer rely on
// the signature index for subset matching and must fall back to a
// scan for this variable.
vl.regular = false
}
// Store all timestamp-value pairs
for _, value := range series.Values {
lookup.data[seriesKey][value.Timestamp] = value.Value
entry, seen := vl.bySig[sig]
if !seen {
entry = &seriesEntry{
labels: series.Labels,
keys: keys,
sig: sig,
data: make(map[int64]float64, len(series.Values)),
}
vl.bySig[sig] = entry
vl.entries = append(vl.entries, entry)
}
// Store all timestamp-value pairs. The first series carrying a given
// signature wins, matching the original "first match" behaviour; a
// later duplicate's points are ignored.
if !seen {
for _, value := range series.Values {
entry.data[value.Timestamp] = value.Value
}
}
}
lookup.byVariable[variable] = vl
}
return lookup
}
// buildSeriesKey creates a unique key for a series within a specific aggregation.
func (fe *FormulaEvaluator) buildSeriesKey(variable string, seriesIndex int, labels []*Label) string {
// Create a deterministic key that includes variable and label information
// Why is variable name needed?
// Because we need to maintain if a certain series belongs to a query.
// The variable name here is the name of the query.
// Why is series index needed?
// Since we support multiple aggregations in the same query, we need to
// make use the series index to differentiate between series from different aggregations.
// Perhaps, we can reduce the allocations here and use the hash of the variable and series index
// to create a unique key.
// However, the number of labels and series from query result should be small,
// and not be a bottleneck.
// So, we can keep it simple for now.
var keyParts []string
keyParts = append(keyParts, variable)
keyParts = append(keyParts, strconv.Itoa(seriesIndex))
// Sort labels by key name for consistent ordering
sortedLabels := make([]*Label, len(labels))
copy(sortedLabels, labels)
slices.SortFunc(sortedLabels, func(i, j *Label) int {
if i.Key.Name < j.Key.Name {
return -1
}
if i.Key.Name > j.Key.Name {
return 1
}
return 0
// labelKeysAndSig returns the sorted label key names and a canonical value
// signature for a label set. The signature is built so that two label sets with
// the same keys and values produce identical strings, and so that projecting a
// superset onto these keys (see projectSig) yields the same string.
func labelKeysAndSig(labels []*Label) ([]string, string) {
n := len(labels)
if n == 0 {
return nil, ""
}
sorted := make([]*Label, n)
copy(sorted, labels)
slices.SortFunc(sorted, func(i, j *Label) int {
return strings.Compare(i.Key.Name, j.Key.Name)
})
for _, label := range sortedLabels {
keyParts = append(keyParts, fmt.Sprintf("%s=%v", label.Key.Name, label.Value))
keys := make([]string, n)
var sb strings.Builder
sb.Grow(n * 16)
for i, label := range sorted {
keys[i] = label.Key.Name
appendKVSig(&sb, label.Key.Name, label.Value)
}
return keys, sb.String()
}
return strings.Join(keyParts, "|")
// projectSig builds the signature of labels projected onto keys (which must be
// sorted). It returns false if labels is missing any of the keys, meaning labels
// cannot match a target that requires those keys.
func projectSig(labels []*Label, keys []string) (string, bool) {
if len(keys) == 0 {
return "", true
}
var sb strings.Builder
sb.Grow(len(keys) * 16)
for _, key := range keys {
value, ok := findLabelValue(labels, key)
if !ok {
return "", false
}
appendKVSig(&sb, key, value)
}
return sb.String(), true
}
// appendKVSig appends a single key/value pair to a signature builder using NUL
// separators, which do not occur in practical label names or values.
func appendKVSig(sb *strings.Builder, key string, value any) {
sb.WriteString(key)
sb.WriteByte(0)
writeLabelValue(sb, value)
sb.WriteByte(0)
}
// writeLabelValue writes the canonical string form of a label value. Strings are
// written directly (the common case for telemetry labels); other types fall back
// to fmt, matching how series are keyed elsewhere (see GetUniqueSeriesKey).
func writeLabelValue(sb *strings.Builder, value any) {
if s, ok := value.(string); ok {
sb.WriteString(s)
return
}
fmt.Fprintf(sb, "%v", value)
}
// findLabelValue returns the value for key in labels, scanning linearly since
// label sets are small.
func findLabelValue(labels []*Label, key string) (any, bool) {
for _, label := range labels {
if label.Key.Name == key {
return label.Value, true
}
}
return nil, false
}
// perhaps this could be named better. The job of this function is to find all unique and supersets
@@ -453,55 +559,117 @@ func (fe *FormulaEvaluator) buildSeriesKey(variable string, seriesIndex int, lab
// and `{"service": "frontend"}` would be the series with `{"service": "frontend", "operation": "GET /api"}`
// So, we create a set of labels sets that can be termed as candidates for the final result.
func (fe *FormulaEvaluator) findUniqueLabelSets(lookup *seriesLookup) [][]*Label {
var allLabelSets [][]*Label
// Collect all label sets from series metadata
for _, series := range lookup.seriesMetadata {
allLabelSets = append(allLabelSets, series.Labels)
}
// sort the label sets by the number of labels in descending order
slices.SortFunc(allLabelSets, func(i, j []*Label) int {
if len(i) > len(j) {
return -1
}
if len(i) < len(j) {
return 1
}
return 0
})
// Find unique label sets using proper label comparison
var uniqueSets [][]*Label
for _, labelSet := range allLabelSets {
isUnique := true
for _, uniqueSet := range uniqueSets {
if fe.isSubset(uniqueSet, labelSet) {
isUnique = false
break
// Collect the distinct label sets across all variables, keyed by their full
// canonical signature. bySig already deduplicates within a variable, so this
// merges duplicates across variables (e.g. A and B grouped identically).
distinct := make(map[string]*seriesEntry)
for _, vl := range lookup.byVariable {
for sig, entry := range vl.bySig {
if _, ok := distinct[sig]; !ok {
distinct[sig] = entry
}
}
if isUnique {
uniqueSets = append(uniqueSets, labelSet)
}
if len(distinct) == 0 {
return nil
}
// Group the distinct sets by their key set. A label set can only be a subset
// of another when its keys are a subset of the other's keys, so subset
// relationships only ever cross groups. Within a group all sets share the same
// keys and distinct values, so none is a subset of another.
groups := make(map[string]*keyGroup)
for _, entry := range distinct {
ksig := strings.Join(entry.keys, "\x00")
g := groups[ksig]
if g == nil {
g = &keyGroup{keys: entry.keys}
groups[ksig] = g
}
g.entries = append(g.entries, entry)
}
// In the overwhelmingly common case every series shares one key set, so there
// is exactly one group and nothing can be a subset of anything else.
if len(groups) == 1 {
result := make([][]*Label, 0, len(distinct))
for _, entry := range distinct {
result = append(result, entry.labels)
}
return result
}
// Otherwise, mark as "covered" every signature that is dominated by a set from
// a group with a strictly larger key set: project the larger set's labels onto
// the smaller key set and that projection covers any equal set there. A set is
// kept iff it is not covered (i.e. it is maximal).
groupList := make([]*keyGroup, 0, len(groups))
for _, g := range groups {
groupList = append(groupList, g)
}
covered := make(map[string]bool)
for _, gy := range groupList { // provider of supersets
for _, gx := range groupList { // smaller key set being dominated
if gx == gy || !keysSubset(gx.keys, gy.keys) {
continue
}
for _, y := range gy.entries {
if sig, ok := projectSig(y.labels, gx.keys); ok {
covered[sig] = true
}
}
}
}
return uniqueSets
result := make([][]*Label, 0, len(distinct))
for sig, entry := range distinct {
if !covered[sig] {
result = append(result, entry.labels)
}
}
return result
}
func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
labelMap1 := make(map[string]any)
labelMap2 := make(map[string]any)
// keyGroup holds the distinct label sets that share a key set.
type keyGroup struct {
keys []string
entries []*seriesEntry
}
for _, label := range labels1 {
labelMap1[label.Key.Name] = label.Value
// keysSubset reports whether the sorted key set a is a subset of the sorted key
// set b.
func keysSubset(a, b []string) bool {
if len(a) > len(b) {
return false
}
for _, label := range labels2 {
labelMap2[label.Key.Name] = label.Value
i := 0
for _, x := range a {
for i < len(b) && b[i] < x {
i++
}
if i >= len(b) || b[i] != x {
return false
}
i++
}
return true
}
for k, v := range labelMap2 {
if val, ok := labelMap1[k]; !ok || val != v {
func labelsToMap(labels []*Label) map[string]any {
m := make(map[string]any, len(labels))
for _, label := range labels {
m[label.Key.Name] = label.Value
}
return m
}
// isSubset reports whether every label in subset is present with the same value in
// supersetMap (i.e. subset ⊆ superset).
func isSubset(supersetMap map[string]any, subset []*Label) bool {
for _, label := range subset {
if val, ok := supersetMap[label.Key.Name]; !ok || val != label.Value {
return false
}
}
@@ -510,65 +678,113 @@ func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
// evaluateForLabelSet performs formula evaluation for a specific label set.
func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *seriesLookup) *TimeSeries {
// Find matching series for each variable
variableData := make(map[string]map[int64]float64)
// not every series would have a value for every timestamp
// so we need to collect all timestamps from the series that have a value
// for the variable
var allTimestamps = make(map[int64]struct{})
nVars := len(fe.variables)
for variable := range fe.aggRefs {
// Find series with matching labels for this variable
for seriesKey, series := range lookup.seriesMetadata {
if strings.HasPrefix(seriesKey, variable+"|") && fe.isSubset(targetLabels, series.Labels) {
if timestampData, exists := lookup.data[seriesKey]; exists {
variableData[variable] = timestampData
// Collect all timestamps
for ts := range timestampData {
allTimestamps[ts] = struct{}{}
}
break // Found matching series for this variable
// matched[i] holds the timestamp->value data for fe.variables[i], or nil if
// no series matched. Indexed by position to avoid a per-call map.
mptr := fe.matchedPool.Get().(*[]map[int64]float64)
matched := *mptr
if cap(matched) < nVars {
matched = make([]map[int64]float64, nVars)
} else {
matched = matched[:nVars]
for i := range matched {
matched[i] = nil
}
}
defer func() {
*mptr = matched[:0]
fe.matchedPool.Put(mptr)
}()
// not every series has a value for every timestamp, so collect the union of
// timestamps across the matched series.
allTimestamps := fe.tsSetPool.Get().(map[int64]struct{})
clear(allTimestamps)
defer fe.tsSetPool.Put(allTimestamps)
// targetMap is only needed for the irregular fallback; built lazily.
var targetMap map[string]any
for i, variable := range fe.variables {
vl := lookup.byVariable[variable]
if vl == nil {
continue
}
var data map[int64]float64
if vl.regular {
// Fast path: the matching series (labels ⊆ target) is exactly the one
// whose signature equals the target projected onto this variable's keys.
if sig, ok := projectSig(targetLabels, vl.keys); ok {
if e := vl.bySig[sig]; e != nil {
data = e.data
}
}
} else {
// Fallback: series have mixed key sets, so scan for the first subset.
if targetMap == nil {
targetMap = labelsToMap(targetLabels)
}
for _, e := range vl.entries {
if isSubset(targetMap, e.labels) {
data = e.data
break
}
}
}
if data != nil {
matched[i] = data
for ts := range data {
allTimestamps[ts] = struct{}{}
}
}
}
// Convert timestamps to sorted slice
// Convert timestamps to a sorted slice.
tsPtr := fe.timestampPool.Get().(*[]int64)
timestamps := (*tsPtr)[:0]
for ts := range allTimestamps {
timestamps = append(timestamps, ts)
}
slices.Sort(timestamps)
defer func() {
*tsPtr = timestamps[:0]
fe.timestampPool.Put(tsPtr)
}()
for ts := range allTimestamps {
timestamps = append(timestamps, ts)
if len(timestamps) == 0 {
return nil
}
slices.Sort(timestamps)
// Evaluate formula at each timestamp
var resultValues []*TimeSeriesValue
values := fe.valuesPool.Get().(map[string]any)
defer fe.valuesPool.Put(values)
for _, timestamp := range timestamps {
// Clear previous values
for k := range values {
delete(values, k)
}
// Allocate the values backing array once (cap = number of timestamps); the
// returned pointers reference into it, so no per-point allocation is needed.
backing := make([]TimeSeriesValue, 0, len(timestamps))
// Collect values for this timestamp
for _, timestamp := range timestamps {
clear(values)
// Collect values for this timestamp. fe.variables may contain a variable
// more than once (e.g. "A * B - A"); values is keyed by name, but each
// data-present occurrence still increments validCount, matching the target
// count of len(fe.variables) below.
validCount := 0
for _, variable := range fe.variables {
if varData, exists := variableData[variable]; exists {
if value, exists := varData[timestamp]; exists {
for i, variable := range fe.variables {
if data := matched[i]; data != nil {
if value, exists := data[timestamp]; exists {
values[variable] = value
validCount++
}
}
}
// Apply default zeros where allowed
// Apply default zeros where allowed. A defaulted variable is only counted
// once even if it appears multiple times, since the second occurrence finds
// the value already set.
for _, variable := range fe.variables {
if _, exists := values[variable]; !exists && fe.canDefaultZero[variable] {
values[variable] = 0.0
@@ -576,12 +792,11 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
}
}
// Skip if we don't have all required variables
if validCount != len(fe.variables) {
// Skip if we don't have all required variables.
if validCount != nVars {
continue
}
// Evaluate expression
result, err := fe.expression.Evaluate(values)
if err != nil {
continue
@@ -592,17 +807,22 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
continue
}
resultValues = append(resultValues, &TimeSeriesValue{
backing = append(backing, TimeSeriesValue{
Timestamp: timestamp,
Value: value,
})
}
if len(resultValues) == 0 {
if len(backing) == 0 {
return nil
}
// Preserve original label structure and metadata
resultValues := make([]*TimeSeriesValue, len(backing))
for i := range backing {
resultValues[i] = &backing[i]
}
// Preserve the original label structure for the result series.
resultLabels := make([]*Label, len(targetLabels))
copy(resultLabels, targetLabels)

View File

@@ -0,0 +1,341 @@
package querybuildertypesv5
import (
"fmt"
"math"
"math/rand"
"slices"
"sort"
"strings"
"testing"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/require"
)
// referenceEvaluate is an intentionally naive, self-contained reimplementation of
// the formula evaluation semantics. It mirrors the documented behaviour:
// - unique label sets are the maximal sets (a set that is a subset of another
// distinct set is dropped), deduped;
// - for each unique target, a variable matches via the FIRST series (in bucket
// order) whose labels are a subset of the target (empty labels match all);
// - a missing variable defaults to zero when canDefaultZero is set, otherwise
// that timestamp is skipped;
// - NaN/Inf results are dropped.
//
// It is deliberately O(n^2) and free of the optimisations under test so it can
// act as a semantic oracle for the differential test below.
func referenceEvaluate(t *testing.T, expr string, canDefaultZero map[string]bool, tsData map[string]*TimeSeriesData) []*TimeSeries {
t.Helper()
fe, err := NewFormulaEvaluator(expr, canDefaultZero)
require.NoError(t, err)
type refSeries struct {
labels []*Label
data map[int64]float64
}
subset := func(a, b []*Label) bool { // a ⊆ b
bm := make(map[string]any, len(b))
for _, l := range b {
bm[l.Key.Name] = l.Value
}
for _, l := range a {
if v, ok := bm[l.Key.Name]; !ok || v != l.Value {
return false
}
}
return true
}
varSeries := map[string][]refSeries{}
var allLabelSets [][]*Label
for variable, aggRef := range fe.aggRefs {
data, ok := tsData[aggRef.QueryName]
if !ok {
for k, v := range tsData {
if strings.EqualFold(k, aggRef.QueryName) {
data, ok = v, true
break
}
}
}
if !ok {
continue
}
var bucket *AggregationBucket
for _, b := range data.Aggregations {
if aggRef.Index != nil && b.Index == *aggRef.Index {
bucket = b
break
}
if aggRef.Alias != nil && b.Alias == *aggRef.Alias {
bucket = b
break
}
}
if bucket == nil {
continue
}
for _, s := range bucket.Series {
d := make(map[int64]float64, len(s.Values))
for _, v := range s.Values {
d[v.Timestamp] = v.Value
}
varSeries[variable] = append(varSeries[variable], refSeries{labels: s.Labels, data: d})
allLabelSets = append(allLabelSets, s.Labels)
}
}
sort.SliceStable(allLabelSets, func(i, j int) bool { return len(allLabelSets[i]) > len(allLabelSets[j]) })
var unique [][]*Label
for _, ls := range allLabelSets {
u := true
for _, us := range unique {
if subset(ls, us) {
u = false
break
}
}
if u {
unique = append(unique, ls)
}
}
var out []*TimeSeries
for _, target := range unique {
matched := map[string]map[int64]float64{}
allTs := map[int64]struct{}{}
for variable := range fe.aggRefs {
for _, s := range varSeries[variable] {
if subset(s.labels, target) {
matched[variable] = s.data
for ts := range s.data {
allTs[ts] = struct{}{}
}
break
}
}
}
var tss []int64
for ts := range allTs {
tss = append(tss, ts)
}
slices.Sort(tss)
var vals []*TimeSeriesValue
for _, ts := range tss {
m := map[string]any{}
cnt := 0
for _, variable := range fe.variables {
if d, ok := matched[variable]; ok {
if v, ok := d[ts]; ok {
m[variable] = v
cnt++
}
}
}
for _, variable := range fe.variables {
if _, ok := m[variable]; !ok && fe.canDefaultZero[variable] {
m[variable] = 0.0
cnt++
}
}
if cnt != len(fe.variables) {
continue
}
res, err := fe.expression.Evaluate(m)
if err != nil {
continue
}
f, ok := res.(float64)
if !ok || math.IsNaN(f) || math.IsInf(f, 0) {
continue
}
vals = append(vals, &TimeSeriesValue{Timestamp: ts, Value: f})
}
if len(vals) == 0 {
continue
}
rl := make([]*Label, len(target))
copy(rl, target)
out = append(out, &TimeSeries{Labels: rl, Values: vals})
}
return out
}
// canonicalize turns a result set into a comparable map: labelSig -> ts -> value.
func canonicalize(series []*TimeSeries) map[string]map[int64]float64 {
out := make(map[string]map[int64]float64, len(series))
for _, s := range series {
keys := make([]string, 0, len(s.Labels))
kv := make(map[string]string, len(s.Labels))
for _, l := range s.Labels {
keys = append(keys, l.Key.Name)
kv[l.Key.Name] = fmt.Sprintf("%v", l.Value)
}
sort.Strings(keys)
var sb strings.Builder
for _, k := range keys {
sb.WriteString(k)
sb.WriteByte('=')
sb.WriteString(kv[k])
sb.WriteByte('|')
}
m := make(map[int64]float64, len(s.Values))
for _, v := range s.Values {
m[v.Timestamp] = v.Value
}
out[sb.String()] = m
}
return out
}
func assertSameResults(t *testing.T, caseName string, got, want []*TimeSeries) {
t.Helper()
gc := canonicalize(got)
wc := canonicalize(want)
if len(gc) != len(wc) {
t.Fatalf("%s: series count mismatch: got %d, want %d\ngot keys=%v\nwant keys=%v",
caseName, len(gc), len(wc), keysOf(gc), keysOf(wc))
}
for sig, wm := range wc {
gm, ok := gc[sig]
if !ok {
t.Fatalf("%s: missing series %q in got", caseName, sig)
}
if len(gm) != len(wm) {
t.Fatalf("%s: series %q point count mismatch: got %d want %d", caseName, sig, len(gm), len(wm))
}
for ts, wv := range wm {
gv, ok := gm[ts]
if !ok {
t.Fatalf("%s: series %q missing ts %d", caseName, sig, ts)
}
if math.Abs(gv-wv) > 1e-9 {
t.Fatalf("%s: series %q ts %d value mismatch: got %v want %v", caseName, sig, ts, gv, wv)
}
}
}
}
func keysOf(m map[string]map[int64]float64) []string {
ks := make([]string, 0, len(m))
for k := range m {
ks = append(ks, k)
}
sort.Strings(ks)
return ks
}
// randomTSData builds randomized time series data for a query, exercising both
// the "regular" case (all series share a key set) and the "irregular" case
// (series have differing key sets), plus empty label sets and overlaps.
func randomTSData(rng *rand.Rand, queryName string) *TimeSeriesData {
keyPool := []string{"svc", "op", "env"}
valPool := []string{"x", "y", "z"}
tsPool := []int64{1, 2, 3, 4, 5}
// Decide a base key set for the query.
regular := rng.Intn(2) == 0
baseKeys := randomSubset(rng, keyPool)
numSeries := rng.Intn(6) // 0..5
series := make([]*TimeSeries, 0, numSeries)
for range numSeries {
keys := baseKeys
if !regular {
keys = randomSubset(rng, keyPool)
}
labels := make([]*Label, 0, len(keys))
for _, k := range keys {
labels = append(labels, &Label{
Key: telemetrytypes.TelemetryFieldKey{Name: k, FieldDataType: telemetrytypes.FieldDataTypeString},
Value: valPool[rng.Intn(len(valPool))],
})
}
// random timestamps subset
var values []*TimeSeriesValue
for _, ts := range tsPool {
if rng.Intn(3) == 0 {
continue
}
values = append(values, &TimeSeriesValue{Timestamp: ts, Value: float64(rng.Intn(100) + 1)})
}
if len(values) == 0 {
values = append(values, &TimeSeriesValue{Timestamp: tsPool[0], Value: float64(rng.Intn(100) + 1)})
}
series = append(series, &TimeSeries{Labels: labels, Values: values})
}
return &TimeSeriesData{
QueryName: queryName,
Aggregations: []*AggregationBucket{
{Index: 0, Alias: queryName + "_agg", Series: series},
},
}
}
func randomSubset(rng *rand.Rand, pool []string) []string {
var out []string
for _, k := range pool {
if rng.Intn(2) == 0 {
out = append(out, k)
}
}
return out
}
// TestFormulaEvaluator_DifferentialRandom fuzzes EvaluateFormula against the
// naive reference oracle to guarantee the optimized implementation preserves
// exact semantics across subset matching, empty labels, defaults and irregular
// key sets.
func TestFormulaEvaluator_DifferentialRandom(t *testing.T) {
rng := rand.New(rand.NewSource(0xC0FFEE))
type exprCase struct {
expr string
vars []string
}
twoVar := []exprCase{
{"A + B", []string{"A", "B"}},
{"A / B", []string{"A", "B"}},
{"B / A", []string{"A", "B"}},
{"A * B - A", []string{"A", "B"}},
}
threeVar := []exprCase{
{"A + B + C", []string{"A", "B", "C"}},
{"(A + B) / C", []string{"A", "B", "C"}},
{"A / B + C", []string{"A", "B", "C"}},
}
const iterations = 5000
for it := range iterations {
var ec exprCase
if rng.Intn(2) == 0 {
ec = twoVar[rng.Intn(len(twoVar))]
} else {
ec = threeVar[rng.Intn(len(threeVar))]
}
canDefaultZero := map[string]bool{}
for _, v := range ec.vars {
canDefaultZero[v] = rng.Intn(2) == 0
}
tsData := map[string]*TimeSeriesData{}
for _, v := range ec.vars {
tsData[v] = randomTSData(rng, v)
}
evaluator, err := NewFormulaEvaluator(ec.expr, canDefaultZero)
require.NoError(t, err)
got, err := evaluator.EvaluateFormula(tsData)
require.NoError(t, err)
want := referenceEvaluate(t, ec.expr, canDefaultZero, tsData)
assertSameResults(t, fmt.Sprintf("iter=%d expr=%q", it, ec.expr), got, want)
}
}