mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-18 16:00:32 +01:00
Compare commits
17 Commits
issue_4361
...
postproces
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
139cd9d1d2 | ||
|
|
445dc3b290 | ||
|
|
76b35b9d8f | ||
|
|
b860cce31d | ||
|
|
1bd4ca88de | ||
|
|
4fcea2fa4e | ||
|
|
695403d78a | ||
|
|
5565906960 | ||
|
|
923de53f92 | ||
|
|
e93c857bdf | ||
|
|
6a96bf489c | ||
|
|
297ff0a1d6 | ||
|
|
0ad2a49b5b | ||
|
|
bcaccff2eb | ||
|
|
71d27b7022 | ||
|
|
7ed9627ae5 | ||
|
|
2a747df764 |
@@ -1,10 +1,11 @@
|
||||
import { useEffect, useState } from 'react';
|
||||
import { useEffect, useMemo, useState } from 'react';
|
||||
import { Button } from '@signozhq/ui/button';
|
||||
import { Checkbox } from '@signozhq/ui/checkbox';
|
||||
import { Input } from '@signozhq/ui/input';
|
||||
import { Input as AntdInput } from 'antd';
|
||||
import logEvent from 'api/common/logEvent';
|
||||
import { ArrowRight } from '@signozhq/icons';
|
||||
import { useAppContext } from 'providers/App/App';
|
||||
|
||||
import { OnboardingQuestionHeader } from '../OnboardingQuestionHeader';
|
||||
|
||||
@@ -32,11 +33,31 @@ const interestedInOptions: Record<string, string> = {
|
||||
openSourceTooling: 'Prefer open-source tooling',
|
||||
};
|
||||
|
||||
function seededShuffle<T>(array: T[], seed: string): T[] {
|
||||
const result = [...array];
|
||||
|
||||
let num = 0;
|
||||
for (let i = 0; i < seed.length; i++) {
|
||||
num = Math.imul(num + seed.charCodeAt(i), 2654435761);
|
||||
num = Math.abs(num);
|
||||
}
|
||||
|
||||
for (let i = result.length - 1; i > 0; i--) {
|
||||
num = Math.abs(Math.imul(num, 1664525) + 1013904223);
|
||||
const j = num % (i + 1);
|
||||
[result[i], result[j]] = [result[j], result[i]];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
export function AboutSigNozQuestions({
|
||||
signozDetails,
|
||||
setSignozDetails,
|
||||
onNext,
|
||||
}: AboutSigNozQuestionsProps): JSX.Element {
|
||||
const { versionData } = useAppContext();
|
||||
|
||||
const [interestInSignoz, setInterestInSignoz] = useState<string[]>(
|
||||
signozDetails?.interestInSignoz || [],
|
||||
);
|
||||
@@ -48,6 +69,12 @@ export function AboutSigNozQuestions({
|
||||
);
|
||||
const [isNextDisabled, setIsNextDisabled] = useState<boolean>(true);
|
||||
|
||||
const shuffledOptionKeys = useMemo(
|
||||
() =>
|
||||
seededShuffle(Object.keys(interestedInOptions), versionData?.version ?? ''),
|
||||
[versionData?.version],
|
||||
);
|
||||
|
||||
useEffect((): void => {
|
||||
if (
|
||||
discoverSignoz !== '' &&
|
||||
@@ -115,7 +142,7 @@ export function AboutSigNozQuestions({
|
||||
<div className="form-group">
|
||||
<div className="question">What got you interested in SigNoz?</div>
|
||||
<div className="checkbox-grid">
|
||||
{Object.keys(interestedInOptions).map((option: string) => (
|
||||
{shuffledOptionKeys.map((option: string) => (
|
||||
<div key={option} className="checkbox-item">
|
||||
<Checkbox
|
||||
id={`checkbox-${option}`}
|
||||
|
||||
2
go.mod
2
go.mod
@@ -11,6 +11,7 @@ require (
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/antonmedv/expr v1.15.3
|
||||
github.com/bytedance/sonic v1.14.1
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/coreos/go-oidc/v3 v3.17.0
|
||||
github.com/dgraph-io/ristretto/v2 v2.3.0
|
||||
@@ -112,7 +113,6 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect
|
||||
github.com/aws/smithy-go v1.24.2 // indirect
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic v1.14.1 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 // indirect
|
||||
|
||||
16
pkg/cache/memorycache/provider.go
vendored
16
pkg/cache/memorycache/provider.go
vendored
@@ -64,7 +64,8 @@ func New(ctx context.Context, settings factory.ProviderSettings, config cache.Co
|
||||
o.ObserveInt64(telemetry.setsRejected, int64(metrics.SetsRejected()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.getsDropped, int64(metrics.GetsDropped()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.getsKept, int64(metrics.GetsKept()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.totalCost, int64(cc.MaxCost()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.costUsed, int64(metrics.CostAdded())-int64(metrics.CostEvicted()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.totalCost, cc.MaxCost(), metric.WithAttributes(attributes...))
|
||||
return nil
|
||||
},
|
||||
telemetry.cacheRatio,
|
||||
@@ -79,6 +80,7 @@ func New(ctx context.Context, settings factory.ProviderSettings, config cache.Co
|
||||
telemetry.setsRejected,
|
||||
telemetry.getsDropped,
|
||||
telemetry.getsKept,
|
||||
telemetry.costUsed,
|
||||
telemetry.totalCost,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -112,11 +114,13 @@ func (provider *provider) Set(ctx context.Context, orgID valuer.UUID, cacheKey s
|
||||
}
|
||||
|
||||
if cloneable, ok := data.(cachetypes.Cloneable); ok {
|
||||
cost := max(cloneable.Cost(), 1)
|
||||
// Clamp to a minimum of 1: ristretto treats cost 0 specially and we
|
||||
// never want zero-size entries to bypass admission accounting.
|
||||
span.SetAttributes(attribute.Bool("memory.cloneable", true))
|
||||
span.SetAttributes(attribute.Int64("memory.cost", 1))
|
||||
span.SetAttributes(attribute.Int64("memory.cost", cost))
|
||||
toCache := cloneable.Clone()
|
||||
// In case of contention we are choosing to evict the cloneable entries first hence cost is set to 1
|
||||
if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, 1, ttl); !ok {
|
||||
if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, cost, ttl); !ok {
|
||||
return errors.New(errors.TypeInternal, errors.CodeInternal, "error writing to cache")
|
||||
}
|
||||
|
||||
@@ -125,15 +129,15 @@ func (provider *provider) Set(ctx context.Context, orgID valuer.UUID, cacheKey s
|
||||
}
|
||||
|
||||
toCache, err := provider.marshalBinary(ctx, data)
|
||||
cost := int64(len(toCache))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cost := max(int64(len(toCache)), 1)
|
||||
|
||||
span.SetAttributes(attribute.Bool("memory.cloneable", false))
|
||||
span.SetAttributes(attribute.Int64("memory.cost", cost))
|
||||
|
||||
if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, 1, ttl); !ok {
|
||||
if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, cost, ttl); !ok {
|
||||
return errors.New(errors.TypeInternal, errors.CodeInternal, "error writing to cache")
|
||||
}
|
||||
|
||||
|
||||
43
pkg/cache/memorycache/provider_test.go
vendored
43
pkg/cache/memorycache/provider_test.go
vendored
@@ -31,6 +31,10 @@ func (cloneable *CloneableA) Clone() cachetypes.Cacheable {
|
||||
}
|
||||
}
|
||||
|
||||
func (cloneable *CloneableA) Cost() int64 {
|
||||
return int64(len(cloneable.Key)) + 16
|
||||
}
|
||||
|
||||
func (cloneable *CloneableA) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(cloneable)
|
||||
}
|
||||
@@ -165,6 +169,45 @@ func TestSetGetWithDifferentTypes(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
// LargeCloneable reports a large byte cost so we can test ristretto eviction
|
||||
// without allocating the full payload in memory.
|
||||
type LargeCloneable struct {
|
||||
Key string
|
||||
CostHint int64
|
||||
}
|
||||
|
||||
func (c *LargeCloneable) Clone() cachetypes.Cacheable {
|
||||
return &LargeCloneable{Key: c.Key, CostHint: c.CostHint}
|
||||
}
|
||||
|
||||
func (c *LargeCloneable) Cost() int64 { return c.CostHint }
|
||||
|
||||
func (c *LargeCloneable) MarshalBinary() ([]byte, error) { return json.Marshal(c) }
|
||||
|
||||
func (c *LargeCloneable) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, c) }
|
||||
|
||||
func TestCloneableExceedingMaxCostIsRejected(t *testing.T) {
|
||||
const maxCost int64 = 1 << 20 // 1 MiB
|
||||
const oversize int64 = 2 << 20 // 2 MiB, larger than the entire cache
|
||||
|
||||
c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
MaxCost: maxCost,
|
||||
}})
|
||||
require.NoError(t, err)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
const key = "oversize-key"
|
||||
assert.NoError(t, c.Set(context.Background(), orgID, key,
|
||||
&LargeCloneable{Key: key, CostHint: oversize}, time.Minute))
|
||||
|
||||
// Ristretto rejects any entry with cost > MaxCost (policy.go:100). Probe
|
||||
// ristretto directly to confirm no admission, instead of relying on metrics.
|
||||
cc := c.(*provider).cc
|
||||
_, ok := cc.Get(strings.Join([]string{orgID.StringValue(), key}, "::"))
|
||||
assert.False(t, ok, "entry with Cost() > MaxCost must be rejected")
|
||||
}
|
||||
|
||||
func TestCloneableConcurrentSetGet(t *testing.T) {
|
||||
cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
|
||||
53
pkg/cache/memorycache/telemetry.go
vendored
53
pkg/cache/memorycache/telemetry.go
vendored
@@ -7,17 +7,18 @@ import (
|
||||
|
||||
type telemetry struct {
|
||||
cacheRatio metric.Float64ObservableGauge
|
||||
cacheHits metric.Int64ObservableGauge
|
||||
cacheMisses metric.Int64ObservableGauge
|
||||
costAdded metric.Int64ObservableGauge
|
||||
costEvicted metric.Int64ObservableGauge
|
||||
keysAdded metric.Int64ObservableGauge
|
||||
keysEvicted metric.Int64ObservableGauge
|
||||
keysUpdated metric.Int64ObservableGauge
|
||||
setsDropped metric.Int64ObservableGauge
|
||||
setsRejected metric.Int64ObservableGauge
|
||||
getsDropped metric.Int64ObservableGauge
|
||||
getsKept metric.Int64ObservableGauge
|
||||
cacheHits metric.Int64ObservableCounter
|
||||
cacheMisses metric.Int64ObservableCounter
|
||||
costAdded metric.Int64ObservableCounter
|
||||
costEvicted metric.Int64ObservableCounter
|
||||
keysAdded metric.Int64ObservableCounter
|
||||
keysEvicted metric.Int64ObservableCounter
|
||||
keysUpdated metric.Int64ObservableCounter
|
||||
setsDropped metric.Int64ObservableCounter
|
||||
setsRejected metric.Int64ObservableCounter
|
||||
getsDropped metric.Int64ObservableCounter
|
||||
getsKept metric.Int64ObservableCounter
|
||||
costUsed metric.Int64ObservableGauge
|
||||
totalCost metric.Int64ObservableGauge
|
||||
}
|
||||
|
||||
@@ -28,62 +29,67 @@ func newMetrics(meter metric.Meter) (*telemetry, error) {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
cacheHits, err := meter.Int64ObservableGauge("signoz.cache.hits", metric.WithDescription("Hits is the number of Get calls where a value was found for the corresponding key."))
|
||||
cacheHits, err := meter.Int64ObservableCounter("signoz.cache.hits", metric.WithDescription("Hits is the number of Get calls where a value was found for the corresponding key."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
cacheMisses, err := meter.Int64ObservableGauge("signoz.cache.misses", metric.WithDescription("Misses is the number of Get calls where a value was not found for the corresponding key"))
|
||||
cacheMisses, err := meter.Int64ObservableCounter("signoz.cache.misses", metric.WithDescription("Misses is the number of Get calls where a value was not found for the corresponding key"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
costAdded, err := meter.Int64ObservableGauge("signoz.cache.cost.added", metric.WithDescription("CostAdded is the sum of costs that have been added (successful Set calls)"))
|
||||
costAdded, err := meter.Int64ObservableCounter("signoz.cache.cost.added", metric.WithDescription("CostAdded is the sum of costs that have been added (successful Set calls)"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
costEvicted, err := meter.Int64ObservableGauge("signoz.cache.cost.evicted", metric.WithDescription("CostEvicted is the sum of all costs that have been evicted"))
|
||||
costEvicted, err := meter.Int64ObservableCounter("signoz.cache.cost.evicted", metric.WithDescription("CostEvicted is the sum of all costs that have been evicted"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
keysAdded, err := meter.Int64ObservableGauge("signoz.cache.keys.added", metric.WithDescription("KeysAdded is the total number of Set calls where a new key-value item was added"))
|
||||
keysAdded, err := meter.Int64ObservableCounter("signoz.cache.keys.added", metric.WithDescription("KeysAdded is the total number of Set calls where a new key-value item was added"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
keysEvicted, err := meter.Int64ObservableGauge("signoz.cache.keys.evicted", metric.WithDescription("KeysEvicted is the total number of keys evicted"))
|
||||
keysEvicted, err := meter.Int64ObservableCounter("signoz.cache.keys.evicted", metric.WithDescription("KeysEvicted is the total number of keys evicted"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
keysUpdated, err := meter.Int64ObservableGauge("signoz.cache.keys.updated", metric.WithDescription("KeysUpdated is the total number of Set calls where the value was updated"))
|
||||
keysUpdated, err := meter.Int64ObservableCounter("signoz.cache.keys.updated", metric.WithDescription("KeysUpdated is the total number of Set calls where the value was updated"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
setsDropped, err := meter.Int64ObservableGauge("signoz.cache.sets.dropped", metric.WithDescription("SetsDropped is the number of Set calls that don't make it into internal buffers (due to contention or some other reason)"))
|
||||
setsDropped, err := meter.Int64ObservableCounter("signoz.cache.sets.dropped", metric.WithDescription("SetsDropped is the number of Set calls that don't make it into internal buffers (due to contention or some other reason)"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
setsRejected, err := meter.Int64ObservableGauge("signoz.cache.sets.rejected", metric.WithDescription("SetsRejected is the number of Set calls rejected by the policy (TinyLFU)"))
|
||||
setsRejected, err := meter.Int64ObservableCounter("signoz.cache.sets.rejected", metric.WithDescription("SetsRejected is the number of Set calls rejected by the policy (TinyLFU)"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
getsDropped, err := meter.Int64ObservableGauge("signoz.cache.gets.dropped", metric.WithDescription("GetsDropped is the number of Get calls that don't make it into internal buffers (due to contention or some other reason)"))
|
||||
getsDropped, err := meter.Int64ObservableCounter("signoz.cache.gets.dropped", metric.WithDescription("GetsDropped is the number of Get calls that don't make it into internal buffers (due to contention or some other reason)"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
getsKept, err := meter.Int64ObservableGauge("signoz.cache.gets.kept", metric.WithDescription("GetsKept is the number of Get calls that make it into internal buffers"))
|
||||
getsKept, err := meter.Int64ObservableCounter("signoz.cache.gets.kept", metric.WithDescription("GetsKept is the number of Get calls that make it into internal buffers"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
totalCost, err := meter.Int64ObservableGauge("signoz.cache.total.cost", metric.WithDescription("TotalCost is the available cost configured for the cache"))
|
||||
costUsed, err := meter.Int64ObservableGauge("signoz.cache.cost.used", metric.WithDescription("CostUsed is the current retained cost in the cache (CostAdded - CostEvicted)."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
totalCost, err := meter.Int64ObservableGauge("signoz.cache.total.cost", metric.WithDescription("TotalCost is the configured MaxCost ceiling for the cache."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
@@ -105,6 +111,7 @@ func newMetrics(meter metric.Meter) (*telemetry, error) {
|
||||
setsRejected: setsRejected,
|
||||
getsDropped: getsDropped,
|
||||
getsKept: getsKept,
|
||||
costUsed: costUsed,
|
||||
totalCost: totalCost,
|
||||
}, nil
|
||||
}
|
||||
|
||||
4
pkg/cache/rediscache/provider_test.go
vendored
4
pkg/cache/rediscache/provider_test.go
vendored
@@ -29,6 +29,10 @@ func (cacheable *CacheableA) Clone() cachetypes.Cacheable {
|
||||
}
|
||||
}
|
||||
|
||||
func (cacheable *CacheableA) Cost() int64 {
|
||||
return int64(len(cacheable.Key)) + 16
|
||||
}
|
||||
|
||||
func (cacheable *CacheableA) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(cacheable)
|
||||
}
|
||||
|
||||
@@ -12,8 +12,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/bytedance/sonic"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -22,6 +24,8 @@ var (
|
||||
// written clickhouse query. The column alias indcate which value is
|
||||
// to be considered as final result (or target).
|
||||
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
|
||||
|
||||
CodeFailUnmarshalJSONColumn = errors.MustNewCode("fail_unmarshal_json_column")
|
||||
)
|
||||
|
||||
// consume reads every row and shapes it into the payload expected for the
|
||||
@@ -393,11 +397,16 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
|
||||
|
||||
// de-reference the typed pointer to any
|
||||
val := reflect.ValueOf(cellPtr).Elem().Interface()
|
||||
// Post-process JSON columns: normalize into String value
|
||||
// Post-process JSON columns: unmarshal bytes into map[string]any
|
||||
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
|
||||
switch x := val.(type) {
|
||||
case []byte:
|
||||
val = string(x)
|
||||
var m map[string]any
|
||||
err := sonic.Unmarshal(x, &m)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailUnmarshalJSONColumn, "failed to unmarshal JSON column %s", name)
|
||||
}
|
||||
val = m
|
||||
default:
|
||||
// already a structured type (map[string]any, []any, etc.)
|
||||
}
|
||||
|
||||
@@ -12,9 +12,12 @@ import (
|
||||
"github.com/SigNoz/govaluate"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/types/featuretypes"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// queryInfo holds common query properties.
|
||||
@@ -50,7 +53,7 @@ func getQueryName(spec any) string {
|
||||
return getqueryInfo(spec).Name
|
||||
}
|
||||
|
||||
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
// Convert results to typed format for processing
|
||||
typedResults := make(map[string]*qbtypes.Result)
|
||||
for name, result := range results {
|
||||
@@ -69,6 +72,7 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
if result, ok := typedResults[spec.Name]; ok {
|
||||
result = postProcessBuilderQuery(q, result, spec, req)
|
||||
result = q.postProcessLogBody(ctx, orgID, result, req)
|
||||
typedResults[spec.Name] = result
|
||||
}
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
@@ -335,10 +339,8 @@ func (q *querier) applyFormulas(ctx context.Context, results map[string]*qbtypes
|
||||
}
|
||||
case qbtypes.RequestTypeScalar:
|
||||
result := q.processScalarFormula(ctx, results, formula, req)
|
||||
if result != nil {
|
||||
result = q.applySeriesLimit(result, formula.Limit, formula.Order)
|
||||
results[name] = result
|
||||
}
|
||||
// For scalar results, apply limit by processScalarFormula itself since it needs to be applied before converting back to scalar format
|
||||
results[name] = result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -526,6 +528,9 @@ func (q *querier) processScalarFormula(
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply ordering (and limit) before converting to scalar format.
|
||||
formulaSeries = qbtypes.ApplySeriesLimit(formulaSeries, formula.Order, formula.Limit)
|
||||
|
||||
// Convert back to scalar format
|
||||
scalarResult := &qbtypes.ScalarData{
|
||||
QueryName: formula.Name,
|
||||
@@ -1045,3 +1050,33 @@ func (q *querier) calculateFormulaStep(expression string, req *qbtypes.QueryRang
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// postProcessLogBody removes the "message" key from the body map when it is empty.
|
||||
// Only runs for raw list queries with the use_json_body feature enabled.
|
||||
func (q *querier) postProcessLogBody(ctx context.Context, orgID valuer.UUID, result *qbtypes.Result, req *qbtypes.QueryRangeRequest) *qbtypes.Result {
|
||||
if req.RequestType != qbtypes.RequestTypeRaw {
|
||||
return result
|
||||
}
|
||||
if !q.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(orgID)) {
|
||||
return result
|
||||
}
|
||||
rawData, ok := result.Value.(*qbtypes.RawData)
|
||||
if !ok {
|
||||
return result
|
||||
}
|
||||
for _, row := range rawData.Rows {
|
||||
bodyMap, ok := row.Data["body"].(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if msg, exists := bodyMap["message"]; exists {
|
||||
switch v := msg.(type) {
|
||||
case string:
|
||||
if v == "" {
|
||||
delete(bodyMap, "message")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -1,15 +1,155 @@
|
||||
package querier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// scalarInputResult builds a ScalarData result with one group column ("service")
|
||||
// and one aggregation column ("__result"), holding the provided (service, value) rows.
|
||||
func scalarInputResult(queryName string, rows []struct {
|
||||
service string
|
||||
value float64
|
||||
}) *qbtypes.Result {
|
||||
serviceKey := telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
}
|
||||
resultKey := telemetrytypes.TelemetryFieldKey{
|
||||
Name: "__result",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeFloat64,
|
||||
}
|
||||
|
||||
data := make([][]any, 0, len(rows))
|
||||
for _, r := range rows {
|
||||
data = append(data, []any{r.service, r.value})
|
||||
}
|
||||
|
||||
return &qbtypes.Result{
|
||||
Value: &qbtypes.ScalarData{
|
||||
QueryName: queryName,
|
||||
Columns: []*qbtypes.ColumnDescriptor{
|
||||
{
|
||||
TelemetryFieldKey: serviceKey,
|
||||
QueryName: queryName,
|
||||
Type: qbtypes.ColumnTypeGroup,
|
||||
},
|
||||
{
|
||||
TelemetryFieldKey: resultKey,
|
||||
QueryName: queryName,
|
||||
AggregationIndex: 0,
|
||||
Type: qbtypes.ColumnTypeAggregation,
|
||||
},
|
||||
},
|
||||
Data: data,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessScalarFormula_AppliesOrderAndLimit(t *testing.T) {
|
||||
q := &querier{
|
||||
logger: instrumentationtest.New().Logger(),
|
||||
}
|
||||
|
||||
// Mimic what a dashboard emits: orderBy keyed by the formula name ("F1"),
|
||||
// which applyFormulas rewrites to __result before sorting.
|
||||
orderByFormula := func(name string, dir qbtypes.OrderDirection) []qbtypes.OrderBy {
|
||||
return []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: name,
|
||||
},
|
||||
},
|
||||
Direction: dir,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// A+B per service: a=101, b=11, c=2
|
||||
makeInputs := func() map[string]*qbtypes.Result {
|
||||
return map[string]*qbtypes.Result{
|
||||
"A": scalarInputResult("A", []struct {
|
||||
service string
|
||||
value float64
|
||||
}{
|
||||
{"a", 100},
|
||||
{"b", 10},
|
||||
{"c", 1},
|
||||
}),
|
||||
"B": scalarInputResult("B", []struct {
|
||||
service string
|
||||
value float64
|
||||
}{
|
||||
{"a", 1},
|
||||
{"b", 0},
|
||||
{"c", 1},
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
makeReq := func(formula qbtypes.QueryBuilderFormula) *qbtypes.QueryRangeRequest {
|
||||
return &qbtypes.QueryRangeRequest{
|
||||
RequestType: qbtypes.RequestTypeScalar,
|
||||
CompositeQuery: qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{Type: qbtypes.QueryTypeBuilder, Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{Name: "A"}},
|
||||
{Type: qbtypes.QueryTypeBuilder, Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{Name: "B"}},
|
||||
{Type: qbtypes.QueryTypeFormula, Spec: formula},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("F1 desc with limit truncates and sorts", func(t *testing.T) {
|
||||
formula := qbtypes.QueryBuilderFormula{
|
||||
Name: "F1",
|
||||
Expression: "A + B",
|
||||
Order: orderByFormula("F1", qbtypes.OrderDirectionDesc),
|
||||
Limit: 2,
|
||||
}
|
||||
|
||||
out := q.applyFormulas(context.Background(), makeInputs(), makeReq(formula))
|
||||
got, ok := out["F1"]
|
||||
require.True(t, ok, "formula result missing")
|
||||
scalar, ok := got.Value.(*qbtypes.ScalarData)
|
||||
require.True(t, ok, "expected *ScalarData, got %T", got.Value)
|
||||
|
||||
// Limit=2 + F1 desc: the two largest __result rows in descending order.
|
||||
require.Len(t, scalar.Data, 2, "limit=2 was ignored before the fix")
|
||||
require.Equal(t, "a", scalar.Data[0][0])
|
||||
require.InDelta(t, 101.0, scalar.Data[0][1].(float64), 1e-9)
|
||||
require.Equal(t, "b", scalar.Data[1][0])
|
||||
require.InDelta(t, 10.0, scalar.Data[1][1].(float64), 1e-9)
|
||||
})
|
||||
|
||||
t.Run("F1 desc without limit sorts all rows", func(t *testing.T) {
|
||||
formula := qbtypes.QueryBuilderFormula{
|
||||
Name: "F1",
|
||||
Expression: "A / B",
|
||||
Order: orderByFormula("F1", qbtypes.OrderDirectionAsc),
|
||||
}
|
||||
|
||||
out := q.applyFormulas(context.Background(), makeInputs(), makeReq(formula))
|
||||
got, ok := out["F1"]
|
||||
require.True(t, ok)
|
||||
scalar, ok := got.Value.(*qbtypes.ScalarData)
|
||||
require.True(t, ok)
|
||||
|
||||
require.Len(t, scalar.Data, 2)
|
||||
require.Equal(t, "c", scalar.Data[0][0])
|
||||
require.InDelta(t, 1.0, scalar.Data[0][1].(float64), 1e-9)
|
||||
require.Equal(t, "a", scalar.Data[1][0])
|
||||
require.InDelta(t, 100.0, scalar.Data[1][1].(float64), 1e-9)
|
||||
})
|
||||
}
|
||||
|
||||
// Multiple series with different number of labels, shouldn't panic and should align labels correctly.
|
||||
func TestConvertTimeSeriesDataToScalar_RaggedLabels(t *testing.T) {
|
||||
label := func(name string, value any) *qbtypes.Label {
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
@@ -35,6 +36,7 @@ var (
|
||||
|
||||
type querier struct {
|
||||
logger *slog.Logger
|
||||
fl flagger.Flagger
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
promEngine prometheus.Prometheus
|
||||
@@ -62,10 +64,12 @@ func New(
|
||||
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
|
||||
bucketCache BucketCache,
|
||||
flagger flagger.Flagger,
|
||||
) *querier {
|
||||
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
|
||||
return &querier{
|
||||
logger: querierSettings.Logger(),
|
||||
fl: flagger,
|
||||
telemetryStore: telemetryStore,
|
||||
metadataStore: metadataStore,
|
||||
promEngine: promEngine,
|
||||
@@ -684,7 +688,7 @@ func (q *querier) run(
|
||||
}
|
||||
|
||||
gomaps.Copy(results, preseededResults)
|
||||
processedResults, err := q.postProcessResults(ctx, results, req)
|
||||
processedResults, err := q.postProcessResults(ctx, orgID, results, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
@@ -44,14 +45,15 @@ func TestQueryRange_MetricTypeMissing(t *testing.T) {
|
||||
providerSettings,
|
||||
nil, // telemetryStore
|
||||
metadataStore,
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
@@ -116,6 +118,7 @@ func TestQueryRange_MetricTypeFromStore(t *testing.T) {
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
|
||||
@@ -186,5 +186,6 @@ func newProvider(
|
||||
meterStmtBuilder,
|
||||
traceOperatorStmtBuilder,
|
||||
bucketCache,
|
||||
flagger,
|
||||
), nil
|
||||
}
|
||||
|
||||
@@ -769,6 +769,13 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
}
|
||||
|
||||
// Clamp the top-level Step for PromQL
|
||||
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL {
|
||||
if minStep := common.MinAllowedStepInterval(queryRangeParams.Start, queryRangeParams.End); queryRangeParams.Step < minStep {
|
||||
queryRangeParams.Step = minStep
|
||||
}
|
||||
}
|
||||
|
||||
// prepare the variables for the corresponding query type
|
||||
formattedVars := make(map[string]interface{})
|
||||
for name, value := range queryRangeParams.Variables {
|
||||
|
||||
@@ -41,6 +41,11 @@ func (c *GetWaterfallSpansForTraceWithMetadataCache) Clone() cachetypes.Cacheabl
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GetWaterfallSpansForTraceWithMetadataCache) Cost() int64 {
|
||||
const perSpanBytes = 256
|
||||
return int64(c.TotalSpans) * perSpanBytes
|
||||
}
|
||||
|
||||
func (c *GetWaterfallSpansForTraceWithMetadataCache) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
@@ -66,6 +71,16 @@ func (c *GetFlamegraphSpansForTraceCache) Clone() cachetypes.Cacheable {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GetFlamegraphSpansForTraceCache) Cost() int64 {
|
||||
const perSpanBytes = 128
|
||||
var spans int64
|
||||
for _, row := range c.SelectedSpans {
|
||||
spans += int64(len(row))
|
||||
}
|
||||
spans += int64(len(c.TraceRoots))
|
||||
return spans * perSpanBytes
|
||||
}
|
||||
|
||||
func (c *GetFlamegraphSpansForTraceCache) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flagger,
|
||||
), metadataStore
|
||||
}
|
||||
|
||||
@@ -102,6 +103,7 @@ func prepareQuerierForLogs(t *testing.T, telemetryStore telemetrystore.Telemetry
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -146,5 +148,6 @@ func prepareQuerierForTraces(t *testing.T, telemetryStore telemetrystore.Telemet
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -18,6 +18,10 @@ type Cloneable interface {
|
||||
// Creates a deep copy of the Cacheable. This method is useful for memory caches to avoid the need for serialization/deserialization. It also prevents
|
||||
// race conditions in the memory cache.
|
||||
Clone() Cacheable
|
||||
// Cost returns the weight of this entry for cost-based cache accounting
|
||||
// and eviction. Typically derived from the approximate retained byte size,
|
||||
// but the value represents cache cost, not literal bytes.
|
||||
Cost() int64
|
||||
}
|
||||
|
||||
func NewSha1CacheKey(val string) string {
|
||||
|
||||
@@ -59,3 +59,21 @@ func (c *CachedData) Clone() cachetypes.Cacheable {
|
||||
|
||||
return clonedCachedData
|
||||
}
|
||||
|
||||
// Cost approximates the retained bytes of this CachedData for use as the
|
||||
// ristretto cache cost. The dominant contributor is the serialized bucket
|
||||
// values (json.RawMessage); other fields are fixed-size or small strings.
|
||||
func (c *CachedData) Cost() int64 {
|
||||
var size int64
|
||||
for _, b := range c.Buckets {
|
||||
if b == nil {
|
||||
continue
|
||||
}
|
||||
// Value is the bulk of the payload
|
||||
size += int64(len(b.Value))
|
||||
}
|
||||
for _, w := range c.Warnings {
|
||||
size += int64(len(w))
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
6
tests/fixtures/querier.py
vendored
6
tests/fixtures/querier.py
vendored
@@ -200,6 +200,8 @@ def build_formula_query(
|
||||
*,
|
||||
functions: list[dict] | None = None,
|
||||
disabled: bool = False,
|
||||
order: list[dict] | None = None,
|
||||
limit: int | None = None,
|
||||
) -> dict:
|
||||
spec: dict[str, Any] = {
|
||||
"name": name,
|
||||
@@ -208,6 +210,10 @@ def build_formula_query(
|
||||
}
|
||||
if functions:
|
||||
spec["functions"] = functions
|
||||
if order:
|
||||
spec["order"] = order
|
||||
if limit is not None:
|
||||
spec["limit"] = limit
|
||||
return {"type": "builder_formula", "spec": spec}
|
||||
|
||||
|
||||
|
||||
@@ -11,6 +11,11 @@ from fixtures.logs import Logs
|
||||
from fixtures.querier import (
|
||||
assert_identical_query_response,
|
||||
assert_minutely_bucket_values,
|
||||
build_formula_query,
|
||||
build_group_by_field,
|
||||
build_logs_aggregation,
|
||||
build_order_by,
|
||||
build_scalar_query,
|
||||
find_named_result,
|
||||
index_series_by_label,
|
||||
make_query_request,
|
||||
@@ -2111,3 +2116,180 @@ def test_logs_fill_zero_formula_with_group_by(
|
||||
expected_by_ts=expectations[service_name],
|
||||
context=f"logs/fillZero/F1/{service_name}",
|
||||
)
|
||||
|
||||
|
||||
def test_logs_formula_orderby_and_limit(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[list[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test that formula results are correctly ordered and limited when
|
||||
order and limit are applied on the formula.
|
||||
"""
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
logs: list[Logs] = []
|
||||
# For service-i (i in 0..9): insert (10 - i) ERROR logs and 2 INFO logs.
|
||||
# A counts ERROR, B counts INFO, so A/B = (10 - i) / 2.
|
||||
# service-0 ratio = 5.0 (highest), service-9 ratio = 0.5 (lowest).
|
||||
for i in range(10):
|
||||
for j in range(10 - i):
|
||||
logs.append(
|
||||
Logs(
|
||||
timestamp=now - timedelta(minutes=j + 1),
|
||||
resources={"service.name": f"service-{i}"},
|
||||
attributes={"code.file": "test.py"},
|
||||
body=f"Error log {i}-{j}",
|
||||
severity_text="ERROR",
|
||||
)
|
||||
)
|
||||
for k in range(2):
|
||||
logs.append(
|
||||
Logs(
|
||||
timestamp=now - timedelta(minutes=k + 1),
|
||||
resources={"service.name": f"service-{i}"},
|
||||
attributes={"code.file": "test.py"},
|
||||
body=f"Info log {i}-{k}",
|
||||
severity_text="INFO",
|
||||
)
|
||||
)
|
||||
# Extra INFO-only services that appear in B but not in A. The formula
|
||||
for name in ("service-info-only-1", "service-info-only-2"):
|
||||
for k in range(2):
|
||||
logs.append(
|
||||
Logs(
|
||||
timestamp=now - timedelta(minutes=k + 1),
|
||||
resources={"service.name": name},
|
||||
attributes={"code.file": "test.py"},
|
||||
body=f"Info log {name}-{k}",
|
||||
severity_text="INFO",
|
||||
)
|
||||
)
|
||||
|
||||
# Logs look like this (columns = minutes before `now`; query range is
|
||||
# (now - 15m, now], so the `now` column is the exclusive upper bound and
|
||||
# no log lands there). E = ERROR, I = INFO, X = both at that minute.
|
||||
#
|
||||
# t-10 t-9 t-8 t-7 t-6 t-5 t-4 t-3 t-2 t-1 |now | A B A/B
|
||||
# service-0: E E E E E E E E X X | | 10 2 5.0
|
||||
# service-1: . E E E E E E E X X | | 9 2 4.5
|
||||
# service-2: . . E E E E E E X X | | 8 2 4.0
|
||||
# service-3: . . . E E E E E X X | | 7 2 3.5
|
||||
# service-4: . . . . E E E E X X | | 6 2 3.0
|
||||
# service-5: . . . . . E E E X X | | 5 2 2.5
|
||||
# service-6: . . . . . . E E X X | | 4 2 2.0
|
||||
# service-7: . . . . . . . E X X | | 3 2 1.5
|
||||
# service-8: . . . . . . . . X X | | 2 2 1.0
|
||||
# service-9: . . . . . . . . I X | | 1 2 0.5
|
||||
# info-only-1: . . . . . . . . I I | | 0* 2 0.0
|
||||
# info-only-2: . . . . . . . . I I | | 0* 2 0.0
|
||||
#
|
||||
# * A is missing for the info-only services; because A is count(), the
|
||||
# formula evaluator defaults missing A to 0, yielding A/B = 0.
|
||||
insert_logs(logs)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
result = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((now - timedelta(minutes=15)).timestamp() * 1000),
|
||||
end_ms=int(now.timestamp() * 1000),
|
||||
request_type="scalar",
|
||||
queries=[
|
||||
build_scalar_query(
|
||||
name="A",
|
||||
signal="logs",
|
||||
aggregations=[build_logs_aggregation("count()")],
|
||||
group_by=[build_group_by_field("service.name")],
|
||||
filter_expression="severity_text = 'ERROR'",
|
||||
disabled=True,
|
||||
),
|
||||
build_scalar_query(
|
||||
name="B",
|
||||
signal="logs",
|
||||
aggregations=[build_logs_aggregation("count()")],
|
||||
group_by=[build_group_by_field("service.name")],
|
||||
filter_expression="severity_text = 'INFO'",
|
||||
disabled=True,
|
||||
),
|
||||
build_formula_query(
|
||||
"F1",
|
||||
"A / B",
|
||||
order=[build_order_by("__result", "desc")],
|
||||
limit=3,
|
||||
),
|
||||
build_formula_query(
|
||||
"F2",
|
||||
"A / B",
|
||||
order=[build_order_by("__result", "desc")],
|
||||
),
|
||||
build_formula_query(
|
||||
"F3",
|
||||
"A / B",
|
||||
order=[build_order_by("__result", "asc")],
|
||||
limit=3,
|
||||
),
|
||||
build_formula_query(
|
||||
"F4",
|
||||
"A / B",
|
||||
order=[build_order_by("__result", "asc")],
|
||||
),
|
||||
],
|
||||
)
|
||||
assert result.status_code == HTTPStatus.OK
|
||||
assert result.json()["status"] == "success"
|
||||
|
||||
results = result.json()["data"]["data"]["results"]
|
||||
|
||||
def extract_services_and_values(query_name: str) -> tuple[list, list]:
|
||||
res = find_named_result(results, query_name)
|
||||
assert res is not None, f"Expected formula result named {query_name}"
|
||||
cols = res["columns"]
|
||||
s_col = next(i for i, c in enumerate(cols) if c["name"] == "service.name")
|
||||
v_col = next(i for i, c in enumerate(cols) if c["name"] == "__result")
|
||||
rows = res["data"]
|
||||
return [row[s_col] for row in rows], [row[v_col] for row in rows]
|
||||
|
||||
# Because A is count(), canDefaultZero["A"] is true; the formula evaluator
|
||||
# defaults A to 0 for services that exist only in B. So the two INFO-only
|
||||
# services appear in the formula result with value 0.0 (extreme bottom in
|
||||
# desc order, extreme top in asc order). Their relative ordering is not
|
||||
# deterministic across separate formula evaluations (tied values).
|
||||
info_only_services = {"service-info-only-1", "service-info-only-2"}
|
||||
|
||||
# F2: desc, no limit -> 12 rows in descending order by value.
|
||||
f2_services, f2_values = extract_services_and_values("F2")
|
||||
assert len(f2_services) == 12, f"F2: expected 12 rows with no limit, got {len(f2_services)}"
|
||||
assert f2_values == [5.0, 4.5, 4.0, 3.5, 3.0, 2.5, 2.0, 1.5, 1.0, 0.5, 0.0, 0.0], f2_values
|
||||
# Top 10 have distinct positive values -> deterministic service ordering.
|
||||
assert f2_services[:10] == [f"service-{i}" for i in range(10)], f2_services[:10]
|
||||
# Tail 2 are the INFO-only services tied at 0.0 (order between them not guaranteed).
|
||||
assert set(f2_services[10:]) == info_only_services, f2_services[10:]
|
||||
|
||||
# F1: desc + limit 3 -> must be exactly the first 3 rows of F2.
|
||||
# Top 3 are not in the tie region, so prefix equality is safe.
|
||||
f1_services, f1_values = extract_services_and_values("F1")
|
||||
assert len(f1_services) == 3, f"F1: expected 3 rows after limit, got {len(f1_services)}"
|
||||
assert f1_services == f2_services[:3], f"F1 services {f1_services} are not the prefix of F2 services {f2_services}"
|
||||
assert f1_values == f2_values[:3], f"F1 values {f1_values} are not the prefix of F2 values {f2_values}"
|
||||
|
||||
# F4: asc, no limit -> 12 rows in ascending order by value.
|
||||
f4_services, f4_values = extract_services_and_values("F4")
|
||||
assert len(f4_services) == 12, f"F4: expected 12 rows with no limit, got {len(f4_services)}"
|
||||
assert f4_values == sorted(f4_values), f"F4 not ascending: {f4_values}"
|
||||
# First 2 are the INFO-only services tied at 0.0 (order between them not guaranteed).
|
||||
assert set(f4_services[:2]) == info_only_services, f4_services[:2]
|
||||
assert f4_values[:2] == [0.0, 0.0], f4_values[:2]
|
||||
# Tail 10 are service-9 down to service-0 by value.
|
||||
assert f4_services[2:] == [f"service-{i}" for i in reversed(range(10))], f4_services[2:]
|
||||
assert f4_values[2:] == [(10 - i) / 2 for i in reversed(range(10))], f4_values[2:]
|
||||
|
||||
# F3: asc + limit 3 -> values must match F4[:3] exactly; service set must
|
||||
# match too. Direct prefix equality on services would be flaky because the
|
||||
# two tied INFO-only entries can swap order between formula evaluations.
|
||||
f3_services, f3_values = extract_services_and_values("F3")
|
||||
assert len(f3_services) == 3, f"F3: expected 3 rows after limit, got {len(f3_services)}"
|
||||
assert f3_values == f4_values[:3], f"F3 values {f3_values} do not match F4[:3] values {f4_values[:3]}"
|
||||
assert set(f3_services) == set(f4_services[:3]), f"F3 services {f3_services} do not match F4[:3] services {f4_services[:3]}"
|
||||
|
||||
@@ -21,7 +21,7 @@ from fixtures.querier import (
|
||||
|
||||
|
||||
def _get_bodies(response: requests.Response) -> list[dict[str, Any]]:
|
||||
return [json.loads(row["data"]["body"]) for row in get_rows(response)]
|
||||
return [row["data"]["body"] for row in get_rows(response)]
|
||||
|
||||
|
||||
def _run_query_case(signoz: types.SigNoz, token: str, now: datetime, case: dict[str, Any]) -> None:
|
||||
@@ -1188,7 +1188,7 @@ def test_message_searches(
|
||||
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
|
||||
|
||||
def _body_messages(response: requests.Response) -> list[str]:
|
||||
return [json.loads(row["data"]["body"]).get("message", "") for row in get_rows(response)]
|
||||
return [row["data"]["body"].get("message", "") for row in get_rows(response)]
|
||||
|
||||
payment_messages = {
|
||||
"Payment processed successfully",
|
||||
|
||||
Reference in New Issue
Block a user