mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-20 18:50:29 +01:00
Compare commits
2 Commits
base-path-
...
tvats-fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
023f2c4872 | ||
|
|
7516a091ac |
16
pkg/cache/memorycache/provider.go
vendored
16
pkg/cache/memorycache/provider.go
vendored
@@ -64,7 +64,8 @@ func New(ctx context.Context, settings factory.ProviderSettings, config cache.Co
|
||||
o.ObserveInt64(telemetry.setsRejected, int64(metrics.SetsRejected()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.getsDropped, int64(metrics.GetsDropped()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.getsKept, int64(metrics.GetsKept()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.totalCost, int64(cc.MaxCost()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.costUsed, int64(metrics.CostAdded())-int64(metrics.CostEvicted()), metric.WithAttributes(attributes...))
|
||||
o.ObserveInt64(telemetry.totalCost, cc.MaxCost(), metric.WithAttributes(attributes...))
|
||||
return nil
|
||||
},
|
||||
telemetry.cacheRatio,
|
||||
@@ -79,6 +80,7 @@ func New(ctx context.Context, settings factory.ProviderSettings, config cache.Co
|
||||
telemetry.setsRejected,
|
||||
telemetry.getsDropped,
|
||||
telemetry.getsKept,
|
||||
telemetry.costUsed,
|
||||
telemetry.totalCost,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -112,11 +114,13 @@ func (provider *provider) Set(ctx context.Context, orgID valuer.UUID, cacheKey s
|
||||
}
|
||||
|
||||
if cloneable, ok := data.(cachetypes.Cloneable); ok {
|
||||
cost := cloneable.Size()
|
||||
// Clamp to a minimum of 1: ristretto treats cost 0 specially and we
|
||||
// never want zero-size entries to bypass admission accounting.
|
||||
span.SetAttributes(attribute.Bool("memory.cloneable", true))
|
||||
span.SetAttributes(attribute.Int64("memory.cost", 1))
|
||||
span.SetAttributes(attribute.Int64("memory.cost", cost))
|
||||
toCache := cloneable.Clone()
|
||||
// In case of contention we are choosing to evict the cloneable entries first hence cost is set to 1
|
||||
if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, 1, ttl); !ok {
|
||||
if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, max(cost, 1), ttl); !ok {
|
||||
return errors.New(errors.TypeInternal, errors.CodeInternal, "error writing to cache")
|
||||
}
|
||||
|
||||
@@ -125,15 +129,15 @@ func (provider *provider) Set(ctx context.Context, orgID valuer.UUID, cacheKey s
|
||||
}
|
||||
|
||||
toCache, err := provider.marshalBinary(ctx, data)
|
||||
cost := int64(len(toCache))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cost := int64(len(toCache))
|
||||
|
||||
span.SetAttributes(attribute.Bool("memory.cloneable", false))
|
||||
span.SetAttributes(attribute.Int64("memory.cost", cost))
|
||||
|
||||
if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, 1, ttl); !ok {
|
||||
if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, max(cost, 1), ttl); !ok {
|
||||
return errors.New(errors.TypeInternal, errors.CodeInternal, "error writing to cache")
|
||||
}
|
||||
|
||||
|
||||
48
pkg/cache/memorycache/provider_test.go
vendored
48
pkg/cache/memorycache/provider_test.go
vendored
@@ -31,6 +31,10 @@ func (cloneable *CloneableA) Clone() cachetypes.Cacheable {
|
||||
}
|
||||
}
|
||||
|
||||
func (cloneable *CloneableA) Size() int64 {
|
||||
return int64(len(cloneable.Key)) + 16
|
||||
}
|
||||
|
||||
func (cloneable *CloneableA) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(cloneable)
|
||||
}
|
||||
@@ -165,6 +169,50 @@ func TestSetGetWithDifferentTypes(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
// LargeCloneable reports a large byte cost so we can test ristretto eviction
|
||||
// without allocating the full payload in memory.
|
||||
type LargeCloneable struct {
|
||||
Key string
|
||||
Cost int64
|
||||
}
|
||||
|
||||
func (c *LargeCloneable) Clone() cachetypes.Cacheable {
|
||||
return &LargeCloneable{Key: c.Key, Cost: c.Cost}
|
||||
}
|
||||
|
||||
func (c *LargeCloneable) Size() int64 { return c.Cost }
|
||||
|
||||
func (c *LargeCloneable) MarshalBinary() ([]byte, error) { return json.Marshal(c) }
|
||||
|
||||
func (c *LargeCloneable) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, c) }
|
||||
|
||||
func TestCloneableCostTriggersEviction(t *testing.T) {
|
||||
const maxCost int64 = 1 << 20 // 1 MiB
|
||||
const perEntry int64 = 256 * 1024
|
||||
const entries = 32 // 32 * 256 KiB = 8 MiB, well over MaxCost
|
||||
|
||||
c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
MaxCost: maxCost,
|
||||
}})
|
||||
require.NoError(t, err)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
for i := 0; i < entries; i++ {
|
||||
item := &LargeCloneable{Key: fmt.Sprintf("key-%d", i), Cost: perEntry}
|
||||
assert.NoError(t, c.Set(context.Background(), orgID, fmt.Sprintf("key-%d", i), item, time.Minute))
|
||||
}
|
||||
|
||||
metrics := c.(*provider).cc.Metrics
|
||||
// Eviction (or admission rejection) must have kicked in: we wrote 32 entries
|
||||
// each costing 256 KiB into a 1 MiB cache.
|
||||
assert.Greater(t, metrics.KeysEvicted()+metrics.SetsRejected(), uint64(0),
|
||||
"expected eviction or admission rejection once total cost exceeds MaxCost; got evicted=%d rejected=%d",
|
||||
metrics.KeysEvicted(), metrics.SetsRejected())
|
||||
// Net retained cost should not exceed MaxCost.
|
||||
assert.LessOrEqual(t, int64(metrics.CostAdded()-metrics.CostEvicted()), maxCost)
|
||||
}
|
||||
|
||||
func TestCloneableConcurrentSetGet(t *testing.T) {
|
||||
cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
|
||||
53
pkg/cache/memorycache/telemetry.go
vendored
53
pkg/cache/memorycache/telemetry.go
vendored
@@ -7,17 +7,18 @@ import (
|
||||
|
||||
type telemetry struct {
|
||||
cacheRatio metric.Float64ObservableGauge
|
||||
cacheHits metric.Int64ObservableGauge
|
||||
cacheMisses metric.Int64ObservableGauge
|
||||
costAdded metric.Int64ObservableGauge
|
||||
costEvicted metric.Int64ObservableGauge
|
||||
keysAdded metric.Int64ObservableGauge
|
||||
keysEvicted metric.Int64ObservableGauge
|
||||
keysUpdated metric.Int64ObservableGauge
|
||||
setsDropped metric.Int64ObservableGauge
|
||||
setsRejected metric.Int64ObservableGauge
|
||||
getsDropped metric.Int64ObservableGauge
|
||||
getsKept metric.Int64ObservableGauge
|
||||
cacheHits metric.Int64ObservableCounter
|
||||
cacheMisses metric.Int64ObservableCounter
|
||||
costAdded metric.Int64ObservableCounter
|
||||
costEvicted metric.Int64ObservableCounter
|
||||
keysAdded metric.Int64ObservableCounter
|
||||
keysEvicted metric.Int64ObservableCounter
|
||||
keysUpdated metric.Int64ObservableCounter
|
||||
setsDropped metric.Int64ObservableCounter
|
||||
setsRejected metric.Int64ObservableCounter
|
||||
getsDropped metric.Int64ObservableCounter
|
||||
getsKept metric.Int64ObservableCounter
|
||||
costUsed metric.Int64ObservableGauge
|
||||
totalCost metric.Int64ObservableGauge
|
||||
}
|
||||
|
||||
@@ -28,62 +29,67 @@ func newMetrics(meter metric.Meter) (*telemetry, error) {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
cacheHits, err := meter.Int64ObservableGauge("signoz.cache.hits", metric.WithDescription("Hits is the number of Get calls where a value was found for the corresponding key."))
|
||||
cacheHits, err := meter.Int64ObservableCounter("signoz.cache.hits", metric.WithDescription("Hits is the number of Get calls where a value was found for the corresponding key."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
cacheMisses, err := meter.Int64ObservableGauge("signoz.cache.misses", metric.WithDescription("Misses is the number of Get calls where a value was not found for the corresponding key"))
|
||||
cacheMisses, err := meter.Int64ObservableCounter("signoz.cache.misses", metric.WithDescription("Misses is the number of Get calls where a value was not found for the corresponding key"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
costAdded, err := meter.Int64ObservableGauge("signoz.cache.cost.added", metric.WithDescription("CostAdded is the sum of costs that have been added (successful Set calls)"))
|
||||
costAdded, err := meter.Int64ObservableCounter("signoz.cache.cost.added", metric.WithDescription("CostAdded is the sum of costs that have been added (successful Set calls)"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
costEvicted, err := meter.Int64ObservableGauge("signoz.cache.cost.evicted", metric.WithDescription("CostEvicted is the sum of all costs that have been evicted"))
|
||||
costEvicted, err := meter.Int64ObservableCounter("signoz.cache.cost.evicted", metric.WithDescription("CostEvicted is the sum of all costs that have been evicted"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
keysAdded, err := meter.Int64ObservableGauge("signoz.cache.keys.added", metric.WithDescription("KeysAdded is the total number of Set calls where a new key-value item was added"))
|
||||
keysAdded, err := meter.Int64ObservableCounter("signoz.cache.keys.added", metric.WithDescription("KeysAdded is the total number of Set calls where a new key-value item was added"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
keysEvicted, err := meter.Int64ObservableGauge("signoz.cache.keys.evicted", metric.WithDescription("KeysEvicted is the total number of keys evicted"))
|
||||
keysEvicted, err := meter.Int64ObservableCounter("signoz.cache.keys.evicted", metric.WithDescription("KeysEvicted is the total number of keys evicted"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
keysUpdated, err := meter.Int64ObservableGauge("signoz.cache.keys.updated", metric.WithDescription("KeysUpdated is the total number of Set calls where the value was updated"))
|
||||
keysUpdated, err := meter.Int64ObservableCounter("signoz.cache.keys.updated", metric.WithDescription("KeysUpdated is the total number of Set calls where the value was updated"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
setsDropped, err := meter.Int64ObservableGauge("signoz.cache.sets.dropped", metric.WithDescription("SetsDropped is the number of Set calls that don't make it into internal buffers (due to contention or some other reason)"))
|
||||
setsDropped, err := meter.Int64ObservableCounter("signoz.cache.sets.dropped", metric.WithDescription("SetsDropped is the number of Set calls that don't make it into internal buffers (due to contention or some other reason)"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
setsRejected, err := meter.Int64ObservableGauge("signoz.cache.sets.rejected", metric.WithDescription("SetsRejected is the number of Set calls rejected by the policy (TinyLFU)"))
|
||||
setsRejected, err := meter.Int64ObservableCounter("signoz.cache.sets.rejected", metric.WithDescription("SetsRejected is the number of Set calls rejected by the policy (TinyLFU)"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
getsDropped, err := meter.Int64ObservableGauge("signoz.cache.gets.dropped", metric.WithDescription("GetsDropped is the number of Get calls that don't make it into internal buffers (due to contention or some other reason)"))
|
||||
getsDropped, err := meter.Int64ObservableCounter("signoz.cache.gets.dropped", metric.WithDescription("GetsDropped is the number of Get calls that don't make it into internal buffers (due to contention or some other reason)"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
getsKept, err := meter.Int64ObservableGauge("signoz.cache.gets.kept", metric.WithDescription("GetsKept is the number of Get calls that make it into internal buffers"))
|
||||
getsKept, err := meter.Int64ObservableCounter("signoz.cache.gets.kept", metric.WithDescription("GetsKept is the number of Get calls that make it into internal buffers"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
totalCost, err := meter.Int64ObservableGauge("signoz.cache.total.cost", metric.WithDescription("TotalCost is the available cost configured for the cache"))
|
||||
costUsed, err := meter.Int64ObservableGauge("signoz.cache.cost.used", metric.WithDescription("CostUsed is the current retained cost in the cache (CostAdded - CostEvicted)."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
totalCost, err := meter.Int64ObservableGauge("signoz.cache.total.cost", metric.WithDescription("TotalCost is the configured MaxCost ceiling for the cache."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
@@ -105,6 +111,7 @@ func newMetrics(meter metric.Meter) (*telemetry, error) {
|
||||
setsRejected: setsRejected,
|
||||
getsDropped: getsDropped,
|
||||
getsKept: getsKept,
|
||||
costUsed: costUsed,
|
||||
totalCost: totalCost,
|
||||
}, nil
|
||||
}
|
||||
|
||||
4
pkg/cache/rediscache/provider_test.go
vendored
4
pkg/cache/rediscache/provider_test.go
vendored
@@ -29,6 +29,10 @@ func (cacheable *CacheableA) Clone() cachetypes.Cacheable {
|
||||
}
|
||||
}
|
||||
|
||||
func (cacheable *CacheableA) Size() int64 {
|
||||
return int64(len(cacheable.Key)) + 16
|
||||
}
|
||||
|
||||
func (cacheable *CacheableA) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(cacheable)
|
||||
}
|
||||
|
||||
@@ -41,6 +41,11 @@ func (c *GetWaterfallSpansForTraceWithMetadataCache) Clone() cachetypes.Cacheabl
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GetWaterfallSpansForTraceWithMetadataCache) Size() int64 {
|
||||
const perSpanBytes = 256
|
||||
return int64(c.TotalSpans) * perSpanBytes
|
||||
}
|
||||
|
||||
func (c *GetWaterfallSpansForTraceWithMetadataCache) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
@@ -66,6 +71,16 @@ func (c *GetFlamegraphSpansForTraceCache) Clone() cachetypes.Cacheable {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GetFlamegraphSpansForTraceCache) Size() int64 {
|
||||
const perSpanBytes = 128
|
||||
var spans int64
|
||||
for _, row := range c.SelectedSpans {
|
||||
spans += int64(len(row))
|
||||
}
|
||||
spans += int64(len(c.TraceRoots))
|
||||
return spans * perSpanBytes
|
||||
}
|
||||
|
||||
func (c *GetFlamegraphSpansForTraceCache) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
|
||||
@@ -18,6 +18,9 @@ type Cloneable interface {
|
||||
// Creates a deep copy of the Cacheable. This method is useful for memory caches to avoid the need for serialization/deserialization. It also prevents
|
||||
// race conditions in the memory cache.
|
||||
Clone() Cacheable
|
||||
// Size returns the approximate retained byte cost of this entry. Memory-backed
|
||||
// caches use it as the ristretto cost so MaxCost-based eviction works.
|
||||
Size() int64
|
||||
}
|
||||
|
||||
func NewSha1CacheKey(val string) string {
|
||||
|
||||
@@ -59,3 +59,21 @@ func (c *CachedData) Clone() cachetypes.Cacheable {
|
||||
|
||||
return clonedCachedData
|
||||
}
|
||||
|
||||
// Size approximates the retained bytes of this CachedData. The dominant cost is
|
||||
// the serialized bucket values (json.RawMessage); other fields are fixed-size
|
||||
// or small strings.
|
||||
func (c *CachedData) Size() int64 {
|
||||
var size int64
|
||||
for _, b := range c.Buckets {
|
||||
if b == nil {
|
||||
continue
|
||||
}
|
||||
// Value is the bulk of the payload
|
||||
size += int64(len(b.Value))
|
||||
}
|
||||
for _, w := range c.Warnings {
|
||||
size += int64(len(w))
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
@@ -238,6 +238,13 @@ func (c *WaterfallCache) Clone() cachetypes.Cacheable {
|
||||
}
|
||||
}
|
||||
|
||||
// Size approximates the retained bytes. Each span dominates; use a per-span
|
||||
// constant rather than reflecting field-by-field.
|
||||
func (c *WaterfallCache) Size() int64 {
|
||||
const perSpanBytes = 256
|
||||
return int64(c.TotalSpans) * perSpanBytes
|
||||
}
|
||||
|
||||
func (c *WaterfallCache) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user