mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-06 17:10:25 +01:00
Compare commits
3 Commits
main
...
squeeze-ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6facb2c418 | ||
|
|
31fda2861a | ||
|
|
d3ffefd15a |
@@ -3,9 +3,11 @@ package querybuildertypesv5
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"slices"
|
||||
@@ -129,12 +131,35 @@ type aggregationRef struct {
|
||||
Alias *string // Alias-based reference (e.g., A.my_alias)
|
||||
}
|
||||
|
||||
// seriesLookup provides lookup for series data.
|
||||
// seriesEntry holds one series' values and its canonical label signature so that
|
||||
// matching can be a map lookup instead of rebuilding maps for every comparison.
|
||||
type seriesEntry struct {
|
||||
labels []*Label
|
||||
keys []string // sorted label key names
|
||||
sig string // canonical value signature over keys
|
||||
data map[int64]float64 // timestamp -> value
|
||||
}
|
||||
|
||||
// variableLookup indexes a single variable's (one aggregation's) series.
|
||||
type variableLookup struct {
|
||||
// keys is the sorted set of label key names shared by this variable's series
|
||||
// (taken from the first series). Only meaningful when regular is true.
|
||||
keys []string
|
||||
// regular is true when every series shares the same set of label keys, which
|
||||
// is the normal case for a group-by. It lets matching be an O(1) signature
|
||||
// lookup instead of an O(n) subset scan.
|
||||
regular bool
|
||||
// entries holds the series in bucket order; used for the irregular fallback.
|
||||
entries []*seriesEntry
|
||||
// bySig maps a value signature to the first series (in bucket order) carrying
|
||||
// it. It both drives regular-case matching and enumerates the variable's
|
||||
// distinct label sets for findUniqueLabelSets.
|
||||
bySig map[string]*seriesEntry
|
||||
}
|
||||
|
||||
// seriesLookup provides lookup for series data, grouped by variable.
|
||||
type seriesLookup struct {
|
||||
// seriesKey -> timestamp -> value
|
||||
data map[string]map[int64]float64
|
||||
// seriesKey -> original series for metadata preservation
|
||||
seriesMetadata map[string]*TimeSeries
|
||||
byVariable map[string]*variableLookup
|
||||
}
|
||||
|
||||
// FormulaEvaluator handles formula evaluation b/w time series from different aggregations
|
||||
@@ -190,6 +215,12 @@ type FormulaEvaluator struct {
|
||||
|
||||
timestampPool sync.Pool
|
||||
valuesPool sync.Pool
|
||||
// tsSetPool holds map[int64]struct{} scratch maps used to build the union of
|
||||
// timestamps for a label set.
|
||||
tsSetPool sync.Pool
|
||||
// matchedPool holds []map[int64]float64 scratch slices indexed by variable
|
||||
// position, holding each variable's matched series data for a label set.
|
||||
matchedPool sync.Pool
|
||||
}
|
||||
|
||||
// NewFormulaEvaluator creates a formula evaluator.
|
||||
@@ -243,6 +274,14 @@ func NewFormulaEvaluator(expressionStr string, canDefaultZero map[string]bool) (
|
||||
evaluator.valuesPool.New = func() any {
|
||||
return make(map[string]any, len(evaluator.variables))
|
||||
}
|
||||
evaluator.tsSetPool.New = func() any {
|
||||
return make(map[int64]struct{}, 512)
|
||||
}
|
||||
nVars := len(evaluator.variables)
|
||||
evaluator.matchedPool.New = func() any {
|
||||
s := make([]map[int64]float64, 0, nVars)
|
||||
return &s
|
||||
}
|
||||
|
||||
return evaluator, nil
|
||||
}
|
||||
@@ -291,55 +330,62 @@ func (fe *FormulaEvaluator) EvaluateFormula(timeSeriesData map[string]*TimeSerie
|
||||
// Find all unique label combinations across referenced series
|
||||
uniqueLabelSets := fe.findUniqueLabelSets(lookup)
|
||||
|
||||
// Process each unique label set
|
||||
var resultSeries []*TimeSeries
|
||||
var wg sync.WaitGroup
|
||||
resultChan := make(chan *TimeSeries, len(uniqueLabelSets))
|
||||
maxSeries := make(chan struct{}, 4)
|
||||
|
||||
// For each candidate label set, evaluate the formula expression
|
||||
// and store the result in the resultChan
|
||||
for _, labelSet := range uniqueLabelSets {
|
||||
wg.Add(1)
|
||||
go func(labels []*Label) {
|
||||
defer wg.Done()
|
||||
maxSeries <- struct{}{}
|
||||
defer func() { <-maxSeries }()
|
||||
|
||||
// main workhorse of the formula evaluation
|
||||
series := fe.evaluateForLabelSet(labels, lookup)
|
||||
if series != nil && len(series.Values) > 0 {
|
||||
resultChan <- series
|
||||
}
|
||||
}(labelSet)
|
||||
n := len(uniqueLabelSets)
|
||||
if n == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(resultChan)
|
||||
}()
|
||||
// Each label set is evaluated independently into its own slot, so we can write
|
||||
// results without any synchronization and compact afterwards. This avoids the
|
||||
// per-label-set goroutine + semaphore + channel churn that previously
|
||||
// dominated the profile (runtime.pthread_cond_signal).
|
||||
results := make([]*TimeSeries, n)
|
||||
|
||||
for series := range resultChan {
|
||||
resultSeries = append(resultSeries, series)
|
||||
// For small inputs the goroutine machinery costs more than it saves.
|
||||
const parallelThreshold = 8
|
||||
if n < parallelThreshold {
|
||||
for i, labelSet := range uniqueLabelSets {
|
||||
results[i] = fe.evaluateForLabelSet(labelSet, lookup)
|
||||
}
|
||||
} else {
|
||||
workers := min(runtime.GOMAXPROCS(0), n)
|
||||
// A shared cursor hands out label-set indices to a fixed pool of workers,
|
||||
// balancing load (series sizes vary) with a single atomic add per item.
|
||||
var next atomic.Int64
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(workers)
|
||||
for range workers {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
i := int(next.Add(1)) - 1
|
||||
if i >= n {
|
||||
return
|
||||
}
|
||||
results[i] = fe.evaluateForLabelSet(uniqueLabelSets[i], lookup)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Compact non-empty results in place (results[j] is read before it is
|
||||
// overwritten, since j <= i throughout).
|
||||
resultSeries := results[:0]
|
||||
for _, series := range results {
|
||||
if series != nil && len(series.Values) > 0 {
|
||||
resultSeries = append(resultSeries, series)
|
||||
}
|
||||
}
|
||||
|
||||
return resultSeries, nil
|
||||
}
|
||||
|
||||
// buildSeriesLookup creates lookup structure for all referenced aggregations.
|
||||
// buildSeriesLookup creates the per-variable matching index for all referenced
|
||||
// aggregations.
|
||||
func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSeriesData) *seriesLookup {
|
||||
lookup := &seriesLookup{
|
||||
// data is a map of series key to timestamp to value
|
||||
// series key is a unique identifier for a series
|
||||
// timestamp is the timestamp of the value
|
||||
// value is the value of the series at the timestamp
|
||||
data: make(map[string]map[int64]float64),
|
||||
// seriesMetadata is a map of series key to series metadata
|
||||
// series metadata is the metadata of the series
|
||||
// this is used to preserve the original label structure and metadata
|
||||
// when the series is returned to the caller
|
||||
// It's also used for finding matching series for a variable
|
||||
seriesMetadata: make(map[string]*TimeSeries),
|
||||
byVariable: make(map[string]*variableLookup, len(fe.aggRefs)),
|
||||
}
|
||||
|
||||
for variable, aggRef := range fe.aggRefs {
|
||||
@@ -360,13 +406,10 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
|
||||
continue
|
||||
}
|
||||
|
||||
// Find the specific aggregation bucket
|
||||
// Now, that we have the data for the query, we look for the specific aggregation bucket
|
||||
// referenced in the formula expression.
|
||||
// For example, if the formula expression is `B.2`, the above `data` would be the
|
||||
// time series data for the query B.
|
||||
// The following code will find the aggregation at the index 2
|
||||
// so we can build the series key -> timestamp -> value map for the expr evaluation
|
||||
// Find the specific aggregation bucket referenced in the formula
|
||||
// expression. For example, if the expression is `B.2`, the above `data`
|
||||
// is the time series data for query B and we pick the aggregation at
|
||||
// index 2.
|
||||
var targetBucket *AggregationBucket
|
||||
for _, bucket := range data.Aggregations {
|
||||
if aggRef.Index != nil && bucket.Index == *aggRef.Index {
|
||||
@@ -383,63 +426,126 @@ func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSer
|
||||
continue
|
||||
}
|
||||
|
||||
// Process all series in the target bucket
|
||||
for seriesIdx, series := range targetBucket.Series {
|
||||
seriesKey := fe.buildSeriesKey(variable, seriesIdx, series.Labels)
|
||||
vl := &variableLookup{
|
||||
regular: true,
|
||||
entries: make([]*seriesEntry, 0, len(targetBucket.Series)),
|
||||
bySig: make(map[string]*seriesEntry, len(targetBucket.Series)),
|
||||
}
|
||||
|
||||
// Initialize timestamp map
|
||||
if _, exists := lookup.data[seriesKey]; !exists {
|
||||
lookup.data[seriesKey] = make(map[int64]float64, len(series.Values))
|
||||
lookup.seriesMetadata[seriesKey] = series
|
||||
// Process all series in the target bucket.
|
||||
for seriesIdx, series := range targetBucket.Series {
|
||||
keys, sig := labelKeysAndSig(series.Labels)
|
||||
|
||||
if seriesIdx == 0 {
|
||||
vl.keys = keys
|
||||
} else if vl.regular && !slices.Equal(keys, vl.keys) {
|
||||
// A series with a different key set means we can no longer rely on
|
||||
// the signature index for subset matching and must fall back to a
|
||||
// scan for this variable.
|
||||
vl.regular = false
|
||||
}
|
||||
|
||||
// Store all timestamp-value pairs
|
||||
for _, value := range series.Values {
|
||||
lookup.data[seriesKey][value.Timestamp] = value.Value
|
||||
entry, seen := vl.bySig[sig]
|
||||
if !seen {
|
||||
entry = &seriesEntry{
|
||||
labels: series.Labels,
|
||||
keys: keys,
|
||||
sig: sig,
|
||||
data: make(map[int64]float64, len(series.Values)),
|
||||
}
|
||||
vl.bySig[sig] = entry
|
||||
vl.entries = append(vl.entries, entry)
|
||||
}
|
||||
|
||||
// Store all timestamp-value pairs. The first series carrying a given
|
||||
// signature wins, matching the original "first match" behaviour; a
|
||||
// later duplicate's points are ignored.
|
||||
if !seen {
|
||||
for _, value := range series.Values {
|
||||
entry.data[value.Timestamp] = value.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lookup.byVariable[variable] = vl
|
||||
}
|
||||
|
||||
return lookup
|
||||
}
|
||||
|
||||
// buildSeriesKey creates a unique key for a series within a specific aggregation.
|
||||
func (fe *FormulaEvaluator) buildSeriesKey(variable string, seriesIndex int, labels []*Label) string {
|
||||
// Create a deterministic key that includes variable and label information
|
||||
// Why is variable name needed?
|
||||
// Because we need to maintain if a certain series belongs to a query.
|
||||
// The variable name here is the name of the query.
|
||||
// Why is series index needed?
|
||||
// Since we support multiple aggregations in the same query, we need to
|
||||
// make use the series index to differentiate between series from different aggregations.
|
||||
|
||||
// Perhaps, we can reduce the allocations here and use the hash of the variable and series index
|
||||
// to create a unique key.
|
||||
// However, the number of labels and series from query result should be small,
|
||||
// and not be a bottleneck.
|
||||
// So, we can keep it simple for now.
|
||||
var keyParts []string
|
||||
keyParts = append(keyParts, variable)
|
||||
keyParts = append(keyParts, strconv.Itoa(seriesIndex))
|
||||
|
||||
// Sort labels by key name for consistent ordering
|
||||
sortedLabels := make([]*Label, len(labels))
|
||||
copy(sortedLabels, labels)
|
||||
slices.SortFunc(sortedLabels, func(i, j *Label) int {
|
||||
if i.Key.Name < j.Key.Name {
|
||||
return -1
|
||||
}
|
||||
if i.Key.Name > j.Key.Name {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
// labelKeysAndSig returns the sorted label key names and a canonical value
|
||||
// signature for a label set. The signature is built so that two label sets with
|
||||
// the same keys and values produce identical strings, and so that projecting a
|
||||
// superset onto these keys (see projectSig) yields the same string.
|
||||
func labelKeysAndSig(labels []*Label) ([]string, string) {
|
||||
n := len(labels)
|
||||
if n == 0 {
|
||||
return nil, ""
|
||||
}
|
||||
sorted := make([]*Label, n)
|
||||
copy(sorted, labels)
|
||||
slices.SortFunc(sorted, func(i, j *Label) int {
|
||||
return strings.Compare(i.Key.Name, j.Key.Name)
|
||||
})
|
||||
|
||||
for _, label := range sortedLabels {
|
||||
keyParts = append(keyParts, fmt.Sprintf("%s=%v", label.Key.Name, label.Value))
|
||||
keys := make([]string, n)
|
||||
var sb strings.Builder
|
||||
sb.Grow(n * 16)
|
||||
for i, label := range sorted {
|
||||
keys[i] = label.Key.Name
|
||||
appendKVSig(&sb, label.Key.Name, label.Value)
|
||||
}
|
||||
return keys, sb.String()
|
||||
}
|
||||
|
||||
return strings.Join(keyParts, "|")
|
||||
// projectSig builds the signature of labels projected onto keys (which must be
|
||||
// sorted). It returns false if labels is missing any of the keys, meaning labels
|
||||
// cannot match a target that requires those keys.
|
||||
func projectSig(labels []*Label, keys []string) (string, bool) {
|
||||
if len(keys) == 0 {
|
||||
return "", true
|
||||
}
|
||||
var sb strings.Builder
|
||||
sb.Grow(len(keys) * 16)
|
||||
for _, key := range keys {
|
||||
value, ok := findLabelValue(labels, key)
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
appendKVSig(&sb, key, value)
|
||||
}
|
||||
return sb.String(), true
|
||||
}
|
||||
|
||||
// appendKVSig appends a single key/value pair to a signature builder using NUL
|
||||
// separators, which do not occur in practical label names or values.
|
||||
func appendKVSig(sb *strings.Builder, key string, value any) {
|
||||
sb.WriteString(key)
|
||||
sb.WriteByte(0)
|
||||
writeLabelValue(sb, value)
|
||||
sb.WriteByte(0)
|
||||
}
|
||||
|
||||
// writeLabelValue writes the canonical string form of a label value. Strings are
|
||||
// written directly (the common case for telemetry labels); other types fall back
|
||||
// to fmt, matching how series are keyed elsewhere (see GetUniqueSeriesKey).
|
||||
func writeLabelValue(sb *strings.Builder, value any) {
|
||||
if s, ok := value.(string); ok {
|
||||
sb.WriteString(s)
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(sb, "%v", value)
|
||||
}
|
||||
|
||||
// findLabelValue returns the value for key in labels, scanning linearly since
|
||||
// label sets are small.
|
||||
func findLabelValue(labels []*Label, key string) (any, bool) {
|
||||
for _, label := range labels {
|
||||
if label.Key.Name == key {
|
||||
return label.Value, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// perhaps this could be named better. The job of this function is to find all unique and supersets
|
||||
@@ -453,55 +559,117 @@ func (fe *FormulaEvaluator) buildSeriesKey(variable string, seriesIndex int, lab
|
||||
// and `{"service": "frontend"}` would be the series with `{"service": "frontend", "operation": "GET /api"}`
|
||||
// So, we create a set of labels sets that can be termed as candidates for the final result.
|
||||
func (fe *FormulaEvaluator) findUniqueLabelSets(lookup *seriesLookup) [][]*Label {
|
||||
var allLabelSets [][]*Label
|
||||
|
||||
// Collect all label sets from series metadata
|
||||
for _, series := range lookup.seriesMetadata {
|
||||
allLabelSets = append(allLabelSets, series.Labels)
|
||||
}
|
||||
|
||||
// sort the label sets by the number of labels in descending order
|
||||
slices.SortFunc(allLabelSets, func(i, j []*Label) int {
|
||||
if len(i) > len(j) {
|
||||
return -1
|
||||
}
|
||||
if len(i) < len(j) {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
// Find unique label sets using proper label comparison
|
||||
var uniqueSets [][]*Label
|
||||
for _, labelSet := range allLabelSets {
|
||||
isUnique := true
|
||||
for _, uniqueSet := range uniqueSets {
|
||||
if fe.isSubset(uniqueSet, labelSet) {
|
||||
isUnique = false
|
||||
break
|
||||
// Collect the distinct label sets across all variables, keyed by their full
|
||||
// canonical signature. bySig already deduplicates within a variable, so this
|
||||
// merges duplicates across variables (e.g. A and B grouped identically).
|
||||
distinct := make(map[string]*seriesEntry)
|
||||
for _, vl := range lookup.byVariable {
|
||||
for sig, entry := range vl.bySig {
|
||||
if _, ok := distinct[sig]; !ok {
|
||||
distinct[sig] = entry
|
||||
}
|
||||
}
|
||||
if isUnique {
|
||||
uniqueSets = append(uniqueSets, labelSet)
|
||||
}
|
||||
|
||||
if len(distinct) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Group the distinct sets by their key set. A label set can only be a subset
|
||||
// of another when its keys are a subset of the other's keys, so subset
|
||||
// relationships only ever cross groups. Within a group all sets share the same
|
||||
// keys and distinct values, so none is a subset of another.
|
||||
groups := make(map[string]*keyGroup)
|
||||
for _, entry := range distinct {
|
||||
ksig := strings.Join(entry.keys, "\x00")
|
||||
g := groups[ksig]
|
||||
if g == nil {
|
||||
g = &keyGroup{keys: entry.keys}
|
||||
groups[ksig] = g
|
||||
}
|
||||
g.entries = append(g.entries, entry)
|
||||
}
|
||||
|
||||
// In the overwhelmingly common case every series shares one key set, so there
|
||||
// is exactly one group and nothing can be a subset of anything else.
|
||||
if len(groups) == 1 {
|
||||
result := make([][]*Label, 0, len(distinct))
|
||||
for _, entry := range distinct {
|
||||
result = append(result, entry.labels)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Otherwise, mark as "covered" every signature that is dominated by a set from
|
||||
// a group with a strictly larger key set: project the larger set's labels onto
|
||||
// the smaller key set and that projection covers any equal set there. A set is
|
||||
// kept iff it is not covered (i.e. it is maximal).
|
||||
groupList := make([]*keyGroup, 0, len(groups))
|
||||
for _, g := range groups {
|
||||
groupList = append(groupList, g)
|
||||
}
|
||||
|
||||
covered := make(map[string]bool)
|
||||
for _, gy := range groupList { // provider of supersets
|
||||
for _, gx := range groupList { // smaller key set being dominated
|
||||
if gx == gy || !keysSubset(gx.keys, gy.keys) {
|
||||
continue
|
||||
}
|
||||
for _, y := range gy.entries {
|
||||
if sig, ok := projectSig(y.labels, gx.keys); ok {
|
||||
covered[sig] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return uniqueSets
|
||||
result := make([][]*Label, 0, len(distinct))
|
||||
for sig, entry := range distinct {
|
||||
if !covered[sig] {
|
||||
result = append(result, entry.labels)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
|
||||
labelMap1 := make(map[string]any)
|
||||
labelMap2 := make(map[string]any)
|
||||
// keyGroup holds the distinct label sets that share a key set.
|
||||
type keyGroup struct {
|
||||
keys []string
|
||||
entries []*seriesEntry
|
||||
}
|
||||
|
||||
for _, label := range labels1 {
|
||||
labelMap1[label.Key.Name] = label.Value
|
||||
// keysSubset reports whether the sorted key set a is a subset of the sorted key
|
||||
// set b.
|
||||
func keysSubset(a, b []string) bool {
|
||||
if len(a) > len(b) {
|
||||
return false
|
||||
}
|
||||
for _, label := range labels2 {
|
||||
labelMap2[label.Key.Name] = label.Value
|
||||
i := 0
|
||||
for _, x := range a {
|
||||
for i < len(b) && b[i] < x {
|
||||
i++
|
||||
}
|
||||
if i >= len(b) || b[i] != x {
|
||||
return false
|
||||
}
|
||||
i++
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
for k, v := range labelMap2 {
|
||||
if val, ok := labelMap1[k]; !ok || val != v {
|
||||
func labelsToMap(labels []*Label) map[string]any {
|
||||
m := make(map[string]any, len(labels))
|
||||
for _, label := range labels {
|
||||
m[label.Key.Name] = label.Value
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// isSubset reports whether every label in subset is present with the same value in
|
||||
// supersetMap (i.e. subset ⊆ superset).
|
||||
func isSubset(supersetMap map[string]any, subset []*Label) bool {
|
||||
for _, label := range subset {
|
||||
if val, ok := supersetMap[label.Key.Name]; !ok || val != label.Value {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -510,65 +678,113 @@ func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool {
|
||||
|
||||
// evaluateForLabelSet performs formula evaluation for a specific label set.
|
||||
func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *seriesLookup) *TimeSeries {
|
||||
// Find matching series for each variable
|
||||
variableData := make(map[string]map[int64]float64)
|
||||
// not every series would have a value for every timestamp
|
||||
// so we need to collect all timestamps from the series that have a value
|
||||
// for the variable
|
||||
var allTimestamps = make(map[int64]struct{})
|
||||
nVars := len(fe.variables)
|
||||
|
||||
for variable := range fe.aggRefs {
|
||||
// Find series with matching labels for this variable
|
||||
for seriesKey, series := range lookup.seriesMetadata {
|
||||
if strings.HasPrefix(seriesKey, variable+"|") && fe.isSubset(targetLabels, series.Labels) {
|
||||
if timestampData, exists := lookup.data[seriesKey]; exists {
|
||||
variableData[variable] = timestampData
|
||||
// Collect all timestamps
|
||||
for ts := range timestampData {
|
||||
allTimestamps[ts] = struct{}{}
|
||||
}
|
||||
break // Found matching series for this variable
|
||||
// matched[i] holds the timestamp->value data for fe.variables[i], or nil if
|
||||
// no series matched. Indexed by position to avoid a per-call map.
|
||||
mptr := fe.matchedPool.Get().(*[]map[int64]float64)
|
||||
matched := *mptr
|
||||
if cap(matched) < nVars {
|
||||
matched = make([]map[int64]float64, nVars)
|
||||
} else {
|
||||
matched = matched[:nVars]
|
||||
for i := range matched {
|
||||
matched[i] = nil
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
*mptr = matched[:0]
|
||||
fe.matchedPool.Put(mptr)
|
||||
}()
|
||||
|
||||
// not every series has a value for every timestamp, so collect the union of
|
||||
// timestamps across the matched series.
|
||||
allTimestamps := fe.tsSetPool.Get().(map[int64]struct{})
|
||||
clear(allTimestamps)
|
||||
defer fe.tsSetPool.Put(allTimestamps)
|
||||
|
||||
// targetMap is only needed for the irregular fallback; built lazily.
|
||||
var targetMap map[string]any
|
||||
|
||||
for i, variable := range fe.variables {
|
||||
vl := lookup.byVariable[variable]
|
||||
if vl == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var data map[int64]float64
|
||||
if vl.regular {
|
||||
// Fast path: the matching series (labels ⊆ target) is exactly the one
|
||||
// whose signature equals the target projected onto this variable's keys.
|
||||
if sig, ok := projectSig(targetLabels, vl.keys); ok {
|
||||
if e := vl.bySig[sig]; e != nil {
|
||||
data = e.data
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Fallback: series have mixed key sets, so scan for the first subset.
|
||||
if targetMap == nil {
|
||||
targetMap = labelsToMap(targetLabels)
|
||||
}
|
||||
for _, e := range vl.entries {
|
||||
if isSubset(targetMap, e.labels) {
|
||||
data = e.data
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if data != nil {
|
||||
matched[i] = data
|
||||
for ts := range data {
|
||||
allTimestamps[ts] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Convert timestamps to sorted slice
|
||||
// Convert timestamps to a sorted slice.
|
||||
tsPtr := fe.timestampPool.Get().(*[]int64)
|
||||
timestamps := (*tsPtr)[:0]
|
||||
for ts := range allTimestamps {
|
||||
timestamps = append(timestamps, ts)
|
||||
}
|
||||
slices.Sort(timestamps)
|
||||
defer func() {
|
||||
*tsPtr = timestamps[:0]
|
||||
fe.timestampPool.Put(tsPtr)
|
||||
}()
|
||||
|
||||
for ts := range allTimestamps {
|
||||
timestamps = append(timestamps, ts)
|
||||
if len(timestamps) == 0 {
|
||||
return nil
|
||||
}
|
||||
slices.Sort(timestamps)
|
||||
|
||||
// Evaluate formula at each timestamp
|
||||
var resultValues []*TimeSeriesValue
|
||||
values := fe.valuesPool.Get().(map[string]any)
|
||||
defer fe.valuesPool.Put(values)
|
||||
|
||||
for _, timestamp := range timestamps {
|
||||
// Clear previous values
|
||||
for k := range values {
|
||||
delete(values, k)
|
||||
}
|
||||
// Allocate the values backing array once (cap = number of timestamps); the
|
||||
// returned pointers reference into it, so no per-point allocation is needed.
|
||||
backing := make([]TimeSeriesValue, 0, len(timestamps))
|
||||
|
||||
// Collect values for this timestamp
|
||||
for _, timestamp := range timestamps {
|
||||
clear(values)
|
||||
|
||||
// Collect values for this timestamp. fe.variables may contain a variable
|
||||
// more than once (e.g. "A * B - A"); values is keyed by name, but each
|
||||
// data-present occurrence still increments validCount, matching the target
|
||||
// count of len(fe.variables) below.
|
||||
validCount := 0
|
||||
for _, variable := range fe.variables {
|
||||
if varData, exists := variableData[variable]; exists {
|
||||
if value, exists := varData[timestamp]; exists {
|
||||
for i, variable := range fe.variables {
|
||||
if data := matched[i]; data != nil {
|
||||
if value, exists := data[timestamp]; exists {
|
||||
values[variable] = value
|
||||
validCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply default zeros where allowed
|
||||
// Apply default zeros where allowed. A defaulted variable is only counted
|
||||
// once even if it appears multiple times, since the second occurrence finds
|
||||
// the value already set.
|
||||
for _, variable := range fe.variables {
|
||||
if _, exists := values[variable]; !exists && fe.canDefaultZero[variable] {
|
||||
values[variable] = 0.0
|
||||
@@ -576,12 +792,11 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
||||
}
|
||||
}
|
||||
|
||||
// Skip if we don't have all required variables
|
||||
if validCount != len(fe.variables) {
|
||||
// Skip if we don't have all required variables.
|
||||
if validCount != nVars {
|
||||
continue
|
||||
}
|
||||
|
||||
// Evaluate expression
|
||||
result, err := fe.expression.Evaluate(values)
|
||||
if err != nil {
|
||||
continue
|
||||
@@ -592,17 +807,22 @@ func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *s
|
||||
continue
|
||||
}
|
||||
|
||||
resultValues = append(resultValues, &TimeSeriesValue{
|
||||
backing = append(backing, TimeSeriesValue{
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
if len(resultValues) == 0 {
|
||||
if len(backing) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Preserve original label structure and metadata
|
||||
resultValues := make([]*TimeSeriesValue, len(backing))
|
||||
for i := range backing {
|
||||
resultValues[i] = &backing[i]
|
||||
}
|
||||
|
||||
// Preserve the original label structure for the result series.
|
||||
resultLabels := make([]*Label, len(targetLabels))
|
||||
copy(resultLabels, targetLabels)
|
||||
|
||||
|
||||
@@ -0,0 +1,341 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// referenceEvaluate is an intentionally naive, self-contained reimplementation of
|
||||
// the formula evaluation semantics. It mirrors the documented behaviour:
|
||||
// - unique label sets are the maximal sets (a set that is a subset of another
|
||||
// distinct set is dropped), deduped;
|
||||
// - for each unique target, a variable matches via the FIRST series (in bucket
|
||||
// order) whose labels are a subset of the target (empty labels match all);
|
||||
// - a missing variable defaults to zero when canDefaultZero is set, otherwise
|
||||
// that timestamp is skipped;
|
||||
// - NaN/Inf results are dropped.
|
||||
//
|
||||
// It is deliberately O(n^2) and free of the optimisations under test so it can
|
||||
// act as a semantic oracle for the differential test below.
|
||||
func referenceEvaluate(t *testing.T, expr string, canDefaultZero map[string]bool, tsData map[string]*TimeSeriesData) []*TimeSeries {
|
||||
t.Helper()
|
||||
fe, err := NewFormulaEvaluator(expr, canDefaultZero)
|
||||
require.NoError(t, err)
|
||||
|
||||
type refSeries struct {
|
||||
labels []*Label
|
||||
data map[int64]float64
|
||||
}
|
||||
|
||||
subset := func(a, b []*Label) bool { // a ⊆ b
|
||||
bm := make(map[string]any, len(b))
|
||||
for _, l := range b {
|
||||
bm[l.Key.Name] = l.Value
|
||||
}
|
||||
for _, l := range a {
|
||||
if v, ok := bm[l.Key.Name]; !ok || v != l.Value {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
varSeries := map[string][]refSeries{}
|
||||
var allLabelSets [][]*Label
|
||||
|
||||
for variable, aggRef := range fe.aggRefs {
|
||||
data, ok := tsData[aggRef.QueryName]
|
||||
if !ok {
|
||||
for k, v := range tsData {
|
||||
if strings.EqualFold(k, aggRef.QueryName) {
|
||||
data, ok = v, true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
var bucket *AggregationBucket
|
||||
for _, b := range data.Aggregations {
|
||||
if aggRef.Index != nil && b.Index == *aggRef.Index {
|
||||
bucket = b
|
||||
break
|
||||
}
|
||||
if aggRef.Alias != nil && b.Alias == *aggRef.Alias {
|
||||
bucket = b
|
||||
break
|
||||
}
|
||||
}
|
||||
if bucket == nil {
|
||||
continue
|
||||
}
|
||||
for _, s := range bucket.Series {
|
||||
d := make(map[int64]float64, len(s.Values))
|
||||
for _, v := range s.Values {
|
||||
d[v.Timestamp] = v.Value
|
||||
}
|
||||
varSeries[variable] = append(varSeries[variable], refSeries{labels: s.Labels, data: d})
|
||||
allLabelSets = append(allLabelSets, s.Labels)
|
||||
}
|
||||
}
|
||||
|
||||
sort.SliceStable(allLabelSets, func(i, j int) bool { return len(allLabelSets[i]) > len(allLabelSets[j]) })
|
||||
var unique [][]*Label
|
||||
for _, ls := range allLabelSets {
|
||||
u := true
|
||||
for _, us := range unique {
|
||||
if subset(ls, us) {
|
||||
u = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if u {
|
||||
unique = append(unique, ls)
|
||||
}
|
||||
}
|
||||
|
||||
var out []*TimeSeries
|
||||
for _, target := range unique {
|
||||
matched := map[string]map[int64]float64{}
|
||||
allTs := map[int64]struct{}{}
|
||||
for variable := range fe.aggRefs {
|
||||
for _, s := range varSeries[variable] {
|
||||
if subset(s.labels, target) {
|
||||
matched[variable] = s.data
|
||||
for ts := range s.data {
|
||||
allTs[ts] = struct{}{}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
var tss []int64
|
||||
for ts := range allTs {
|
||||
tss = append(tss, ts)
|
||||
}
|
||||
slices.Sort(tss)
|
||||
|
||||
var vals []*TimeSeriesValue
|
||||
for _, ts := range tss {
|
||||
m := map[string]any{}
|
||||
cnt := 0
|
||||
for _, variable := range fe.variables {
|
||||
if d, ok := matched[variable]; ok {
|
||||
if v, ok := d[ts]; ok {
|
||||
m[variable] = v
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, variable := range fe.variables {
|
||||
if _, ok := m[variable]; !ok && fe.canDefaultZero[variable] {
|
||||
m[variable] = 0.0
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
if cnt != len(fe.variables) {
|
||||
continue
|
||||
}
|
||||
res, err := fe.expression.Evaluate(m)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
f, ok := res.(float64)
|
||||
if !ok || math.IsNaN(f) || math.IsInf(f, 0) {
|
||||
continue
|
||||
}
|
||||
vals = append(vals, &TimeSeriesValue{Timestamp: ts, Value: f})
|
||||
}
|
||||
if len(vals) == 0 {
|
||||
continue
|
||||
}
|
||||
rl := make([]*Label, len(target))
|
||||
copy(rl, target)
|
||||
out = append(out, &TimeSeries{Labels: rl, Values: vals})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// canonicalize turns a result set into a comparable map: labelSig -> ts -> value.
|
||||
func canonicalize(series []*TimeSeries) map[string]map[int64]float64 {
|
||||
out := make(map[string]map[int64]float64, len(series))
|
||||
for _, s := range series {
|
||||
keys := make([]string, 0, len(s.Labels))
|
||||
kv := make(map[string]string, len(s.Labels))
|
||||
for _, l := range s.Labels {
|
||||
keys = append(keys, l.Key.Name)
|
||||
kv[l.Key.Name] = fmt.Sprintf("%v", l.Value)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
var sb strings.Builder
|
||||
for _, k := range keys {
|
||||
sb.WriteString(k)
|
||||
sb.WriteByte('=')
|
||||
sb.WriteString(kv[k])
|
||||
sb.WriteByte('|')
|
||||
}
|
||||
m := make(map[int64]float64, len(s.Values))
|
||||
for _, v := range s.Values {
|
||||
m[v.Timestamp] = v.Value
|
||||
}
|
||||
out[sb.String()] = m
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func assertSameResults(t *testing.T, caseName string, got, want []*TimeSeries) {
|
||||
t.Helper()
|
||||
gc := canonicalize(got)
|
||||
wc := canonicalize(want)
|
||||
if len(gc) != len(wc) {
|
||||
t.Fatalf("%s: series count mismatch: got %d, want %d\ngot keys=%v\nwant keys=%v",
|
||||
caseName, len(gc), len(wc), keysOf(gc), keysOf(wc))
|
||||
}
|
||||
for sig, wm := range wc {
|
||||
gm, ok := gc[sig]
|
||||
if !ok {
|
||||
t.Fatalf("%s: missing series %q in got", caseName, sig)
|
||||
}
|
||||
if len(gm) != len(wm) {
|
||||
t.Fatalf("%s: series %q point count mismatch: got %d want %d", caseName, sig, len(gm), len(wm))
|
||||
}
|
||||
for ts, wv := range wm {
|
||||
gv, ok := gm[ts]
|
||||
if !ok {
|
||||
t.Fatalf("%s: series %q missing ts %d", caseName, sig, ts)
|
||||
}
|
||||
if math.Abs(gv-wv) > 1e-9 {
|
||||
t.Fatalf("%s: series %q ts %d value mismatch: got %v want %v", caseName, sig, ts, gv, wv)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func keysOf(m map[string]map[int64]float64) []string {
|
||||
ks := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
ks = append(ks, k)
|
||||
}
|
||||
sort.Strings(ks)
|
||||
return ks
|
||||
}
|
||||
|
||||
// randomTSData builds randomized time series data for a query, exercising both
|
||||
// the "regular" case (all series share a key set) and the "irregular" case
|
||||
// (series have differing key sets), plus empty label sets and overlaps.
|
||||
func randomTSData(rng *rand.Rand, queryName string) *TimeSeriesData {
|
||||
keyPool := []string{"svc", "op", "env"}
|
||||
valPool := []string{"x", "y", "z"}
|
||||
tsPool := []int64{1, 2, 3, 4, 5}
|
||||
|
||||
// Decide a base key set for the query.
|
||||
regular := rng.Intn(2) == 0
|
||||
baseKeys := randomSubset(rng, keyPool)
|
||||
|
||||
numSeries := rng.Intn(6) // 0..5
|
||||
series := make([]*TimeSeries, 0, numSeries)
|
||||
for range numSeries {
|
||||
keys := baseKeys
|
||||
if !regular {
|
||||
keys = randomSubset(rng, keyPool)
|
||||
}
|
||||
labels := make([]*Label, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
labels = append(labels, &Label{
|
||||
Key: telemetrytypes.TelemetryFieldKey{Name: k, FieldDataType: telemetrytypes.FieldDataTypeString},
|
||||
Value: valPool[rng.Intn(len(valPool))],
|
||||
})
|
||||
}
|
||||
// random timestamps subset
|
||||
var values []*TimeSeriesValue
|
||||
for _, ts := range tsPool {
|
||||
if rng.Intn(3) == 0 {
|
||||
continue
|
||||
}
|
||||
values = append(values, &TimeSeriesValue{Timestamp: ts, Value: float64(rng.Intn(100) + 1)})
|
||||
}
|
||||
if len(values) == 0 {
|
||||
values = append(values, &TimeSeriesValue{Timestamp: tsPool[0], Value: float64(rng.Intn(100) + 1)})
|
||||
}
|
||||
series = append(series, &TimeSeries{Labels: labels, Values: values})
|
||||
}
|
||||
|
||||
return &TimeSeriesData{
|
||||
QueryName: queryName,
|
||||
Aggregations: []*AggregationBucket{
|
||||
{Index: 0, Alias: queryName + "_agg", Series: series},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func randomSubset(rng *rand.Rand, pool []string) []string {
|
||||
var out []string
|
||||
for _, k := range pool {
|
||||
if rng.Intn(2) == 0 {
|
||||
out = append(out, k)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// TestFormulaEvaluator_DifferentialRandom fuzzes EvaluateFormula against the
|
||||
// naive reference oracle to guarantee the optimized implementation preserves
|
||||
// exact semantics across subset matching, empty labels, defaults and irregular
|
||||
// key sets.
|
||||
func TestFormulaEvaluator_DifferentialRandom(t *testing.T) {
|
||||
rng := rand.New(rand.NewSource(0xC0FFEE))
|
||||
|
||||
type exprCase struct {
|
||||
expr string
|
||||
vars []string
|
||||
}
|
||||
twoVar := []exprCase{
|
||||
{"A + B", []string{"A", "B"}},
|
||||
{"A / B", []string{"A", "B"}},
|
||||
{"B / A", []string{"A", "B"}},
|
||||
{"A * B - A", []string{"A", "B"}},
|
||||
}
|
||||
threeVar := []exprCase{
|
||||
{"A + B + C", []string{"A", "B", "C"}},
|
||||
{"(A + B) / C", []string{"A", "B", "C"}},
|
||||
{"A / B + C", []string{"A", "B", "C"}},
|
||||
}
|
||||
|
||||
const iterations = 5000
|
||||
for it := range iterations {
|
||||
var ec exprCase
|
||||
if rng.Intn(2) == 0 {
|
||||
ec = twoVar[rng.Intn(len(twoVar))]
|
||||
} else {
|
||||
ec = threeVar[rng.Intn(len(threeVar))]
|
||||
}
|
||||
|
||||
canDefaultZero := map[string]bool{}
|
||||
for _, v := range ec.vars {
|
||||
canDefaultZero[v] = rng.Intn(2) == 0
|
||||
}
|
||||
|
||||
tsData := map[string]*TimeSeriesData{}
|
||||
for _, v := range ec.vars {
|
||||
tsData[v] = randomTSData(rng, v)
|
||||
}
|
||||
|
||||
evaluator, err := NewFormulaEvaluator(ec.expr, canDefaultZero)
|
||||
require.NoError(t, err)
|
||||
got, err := evaluator.EvaluateFormula(tsData)
|
||||
require.NoError(t, err)
|
||||
|
||||
want := referenceEvaluate(t, ec.expr, canDefaultZero, tsData)
|
||||
|
||||
assertSameResults(t, fmt.Sprintf("iter=%d expr=%q", it, ec.expr), got, want)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user