chore: move to clone instead of json marshal (#10076)

This commit is contained in:
Pandey
2026-01-23 22:04:30 +05:30
committed by GitHub
parent 605d6ba17d
commit 1c4dfc931f
5 changed files with 179 additions and 54 deletions

View File

@@ -36,29 +36,6 @@ func NewBucketCache(settings factory.ProviderSettings, cache cache.Cache, cacheT
}
}
// cachedBucket represents a cached time bucket
type cachedBucket struct {
StartMs uint64 `json:"startMs"`
EndMs uint64 `json:"endMs"`
Type qbtypes.RequestType `json:"type"`
Value json.RawMessage `json:"value"`
Stats qbtypes.ExecStats `json:"stats"`
}
// cachedData represents the full cached data for a query
type cachedData struct {
Buckets []*cachedBucket `json:"buckets"`
Warnings []string `json:"warnings"`
}
func (c *cachedData) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}
func (c *cachedData) MarshalBinary() ([]byte, error) {
return json.Marshal(c)
}
// GetMissRanges returns cached data and missing time ranges
func (bc *bucketCache) GetMissRanges(
ctx context.Context,
@@ -78,7 +55,7 @@ func (bc *bucketCache) GetMissRanges(
bc.logger.DebugContext(ctx, "cache key", "cache_key", cacheKey)
// Try to get cached data
var data cachedData
var data qbtypes.CachedData
err := bc.cache.Get(ctx, orgID, cacheKey, &data)
if err != nil {
if !errors.Ast(err, errors.TypeNotFound) {
@@ -147,9 +124,9 @@ func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Que
cacheKey := bc.generateCacheKey(q)
// Get existing cached data
var existingData cachedData
var existingData qbtypes.CachedData
if err := bc.cache.Get(ctx, orgID, cacheKey, &existingData); err != nil {
existingData = cachedData{}
existingData = qbtypes.CachedData{}
}
// Trim the result to exclude data within flux interval
@@ -203,7 +180,7 @@ func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Que
uniqueWarnings := bc.deduplicateWarnings(allWarnings)
// Create updated cached data
updatedData := cachedData{
updatedData := qbtypes.CachedData{
Buckets: mergedBuckets,
Warnings: uniqueWarnings,
}
@@ -222,7 +199,7 @@ func (bc *bucketCache) generateCacheKey(q qbtypes.Query) string {
}
// findMissingRangesWithStep identifies time ranges not covered by cached buckets with step alignment
func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange {
func (bc *bucketCache) findMissingRangesWithStep(buckets []*qbtypes.CachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange {
// When step is 0 or window is too small to be cached, use simple algorithm
if stepMs == 0 || (startMs+stepMs) > endMs {
return bc.findMissingRangesBasic(buckets, startMs, endMs)
@@ -265,7 +242,7 @@ func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startM
}
if needsSort {
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
slices.SortFunc(buckets, func(a, b *qbtypes.CachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
@@ -339,7 +316,7 @@ func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startM
}
// findMissingRangesBasic is the simple algorithm without step alignment
func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange {
func (bc *bucketCache) findMissingRangesBasic(buckets []*qbtypes.CachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange {
// Check if already sorted before sorting
needsSort := false
for i := 1; i < len(buckets); i++ {
@@ -350,7 +327,7 @@ func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs,
}
if needsSort {
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
slices.SortFunc(buckets, func(a, b *qbtypes.CachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
@@ -421,9 +398,9 @@ func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs,
}
// filterRelevantBuckets returns buckets that overlap with the requested time range
func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, endMs uint64) []*cachedBucket {
func (bc *bucketCache) filterRelevantBuckets(buckets []*qbtypes.CachedBucket, startMs, endMs uint64) []*qbtypes.CachedBucket {
// Pre-allocate with estimated capacity
relevant := make([]*cachedBucket, 0, len(buckets))
relevant := make([]*qbtypes.CachedBucket, 0, len(buckets))
for _, bucket := range buckets {
// Check if bucket overlaps with requested range
@@ -433,7 +410,7 @@ func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, e
}
// Sort by start time
slices.SortFunc(relevant, func(a, b *cachedBucket) int {
slices.SortFunc(relevant, func(a, b *qbtypes.CachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
@@ -447,7 +424,7 @@ func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, e
}
// mergeBuckets combines multiple cached buckets into a single result
func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket, warnings []string) *qbtypes.Result {
func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*qbtypes.CachedBucket, warnings []string) *qbtypes.Result {
if len(buckets) == 0 {
return &qbtypes.Result{}
}
@@ -480,7 +457,7 @@ func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket
}
// mergeTimeSeriesValues merges time series data from multiple buckets
func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cachedBucket) *qbtypes.TimeSeriesData {
func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*qbtypes.CachedBucket) *qbtypes.TimeSeriesData {
// Estimate capacity based on bucket count
estimatedSeries := len(buckets) * 10
@@ -631,7 +608,7 @@ func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFi
}
// resultToBuckets converts a query result into time-based buckets
func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*cachedBucket {
func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*qbtypes.CachedBucket {
// Check if result is empty
isEmpty, isFiltered := bc.isEmptyResult(result)
@@ -652,7 +629,7 @@ func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Resu
// Always create a bucket, even for empty filtered results
// This ensures we don't re-query for data that doesn't exist
return []*cachedBucket{
return []*qbtypes.CachedBucket{
{
StartMs: startMs,
EndMs: endMs,
@@ -664,9 +641,9 @@ func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Resu
}
// mergeAndDeduplicateBuckets combines and deduplicates bucket lists
func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucket) []*cachedBucket {
func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*qbtypes.CachedBucket) []*qbtypes.CachedBucket {
// Create a map to deduplicate by time range
bucketMap := make(map[string]*cachedBucket)
bucketMap := make(map[string]*qbtypes.CachedBucket)
// Add existing buckets
for _, bucket := range existing {
@@ -681,13 +658,13 @@ func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucke
}
// Convert back to slice with pre-allocated capacity
result := make([]*cachedBucket, 0, len(bucketMap))
result := make([]*qbtypes.CachedBucket, 0, len(bucketMap))
for _, bucket := range bucketMap {
result = append(result, bucket)
}
// Sort by start time
slices.SortFunc(result, func(a, b *cachedBucket) int {
slices.SortFunc(result, func(a, b *qbtypes.CachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}

View File

@@ -147,14 +147,14 @@ func BenchmarkBucketCache_MergeTimeSeriesValues(b *testing.B) {
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create test buckets
buckets := make([]*cachedBucket, tc.numBuckets)
buckets := make([]*qbtypes.CachedBucket, tc.numBuckets)
for i := 0; i < tc.numBuckets; i++ {
startMs := uint64(i * 10000)
endMs := uint64((i + 1) * 10000)
result := createBenchmarkResultWithSeries(startMs, endMs, 1000, tc.numSeries, tc.numValues)
valueBytes, _ := json.Marshal(result.Value)
buckets[i] = &cachedBucket{
buckets[i] = &qbtypes.CachedBucket{
StartMs: startMs,
EndMs: endMs,
Type: qbtypes.RequestTypeTimeSeries,
@@ -417,8 +417,8 @@ func createBenchmarkResultWithSeries(startMs, endMs uint64, _ uint64, numSeries,
}
// Helper function to create buckets with specific gap patterns
func createBucketsWithPattern(numBuckets int, pattern string) []*cachedBucket {
buckets := make([]*cachedBucket, 0, numBuckets)
func createBucketsWithPattern(numBuckets int, pattern string) []*qbtypes.CachedBucket {
buckets := make([]*qbtypes.CachedBucket, 0, numBuckets)
for i := 0; i < numBuckets; i++ {
// Skip some buckets based on pattern
@@ -432,7 +432,7 @@ func createBucketsWithPattern(numBuckets int, pattern string) []*cachedBucket {
startMs := uint64(i * 10000)
endMs := uint64((i + 1) * 10000)
buckets = append(buckets, &cachedBucket{
buckets = append(buckets, &qbtypes.CachedBucket{
StartMs: startMs,
EndMs: endMs,
Type: qbtypes.RequestTypeTimeSeries,

View File

@@ -521,7 +521,7 @@ func TestBucketCache_FindMissingRanges_EdgeCases(t *testing.T) {
bc := NewBucketCache(instrumentationtest.New().ToProviderSettings(), memCache, cacheTTL, defaultFluxInterval).(*bucketCache)
// Test with buckets that have gaps and overlaps
buckets := []*cachedBucket{
buckets := []*qbtypes.CachedBucket{
{StartMs: 1000, EndMs: 2000},
{StartMs: 2500, EndMs: 3500},
{StartMs: 3000, EndMs: 4000}, // Overlaps with previous
@@ -1097,7 +1097,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) {
tests := []struct {
name string
buckets []*cachedBucket
buckets []*qbtypes.CachedBucket
startMs uint64
endMs uint64
stepMs uint64
@@ -1106,7 +1106,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) {
}{
{
name: "start_not_aligned_to_step",
buckets: []*cachedBucket{},
buckets: []*qbtypes.CachedBucket{},
startMs: 1500, // Not aligned to 1000ms step
endMs: 5000,
stepMs: 1000,
@@ -1118,7 +1118,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) {
},
{
name: "end_not_aligned_to_step",
buckets: []*cachedBucket{},
buckets: []*qbtypes.CachedBucket{},
startMs: 1000,
endMs: 4500, // Not aligned to 1000ms step
stepMs: 1000,
@@ -1129,7 +1129,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) {
},
{
name: "bucket_boundaries_not_aligned",
buckets: []*cachedBucket{
buckets: []*qbtypes.CachedBucket{
{StartMs: 1500, EndMs: 2500}, // Not aligned
},
startMs: 1000,
@@ -1143,7 +1143,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) {
},
{
name: "small_window_less_than_step",
buckets: []*cachedBucket{},
buckets: []*qbtypes.CachedBucket{},
startMs: 1000,
endMs: 1500, // Less than one step
stepMs: 1000,
@@ -1154,7 +1154,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) {
},
{
name: "zero_step_uses_basic_algorithm",
buckets: []*cachedBucket{},
buckets: []*qbtypes.CachedBucket{},
startMs: 1000,
endMs: 5000,
stepMs: 0,

View File

@@ -0,0 +1,61 @@
package querybuildertypesv5
import (
"bytes"
"encoding/json"
"maps"
"github.com/SigNoz/signoz/pkg/types/cachetypes"
)
var _ cachetypes.Cacheable = (*CachedData)(nil)
type CachedBucket struct {
StartMs uint64 `json:"startMs"`
EndMs uint64 `json:"endMs"`
Type RequestType `json:"type"`
Value json.RawMessage `json:"value"`
Stats ExecStats `json:"stats"`
}
func (c *CachedBucket) Clone() *CachedBucket {
return &CachedBucket{
StartMs: c.StartMs,
EndMs: c.EndMs,
Type: c.Type,
Value: bytes.Clone(c.Value),
Stats: ExecStats{
RowsScanned: c.Stats.RowsScanned,
BytesScanned: c.Stats.BytesScanned,
DurationMS: c.Stats.DurationMS,
StepIntervals: maps.Clone(c.Stats.StepIntervals),
},
}
}
// CachedData represents the full cached data for a query
type CachedData struct {
Buckets []*CachedBucket `json:"buckets"`
Warnings []string `json:"warnings"`
}
func (c *CachedData) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}
func (c *CachedData) MarshalBinary() ([]byte, error) {
return json.Marshal(c)
}
func (c *CachedData) Clone() cachetypes.Cacheable {
clonedCachedData := new(CachedData)
clonedCachedData.Buckets = make([]*CachedBucket, len(c.Buckets))
for i := range c.Buckets {
clonedCachedData.Buckets[i] = c.Buckets[i].Clone()
}
clonedCachedData.Warnings = make([]string, len(c.Warnings))
copy(clonedCachedData.Warnings, c.Warnings)
return clonedCachedData
}

View File

@@ -0,0 +1,87 @@
package querybuildertypesv5
import (
"encoding/json"
"testing"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/assert"
)
func createBuckets_TimeSeries(numBuckets int) []*CachedBucket {
buckets := make([]*CachedBucket, numBuckets)
for i := 0; i < numBuckets; i++ {
startMs := uint64(i * 10000)
endMs := uint64((i + 1) * 10000)
timeSeriesData := &TimeSeriesData{
QueryName: "A",
Aggregations: []*AggregationBucket{
{
Index: 0,
Series: []*TimeSeries{
{
Labels: []*Label{
{Key: telemetrytypes.TelemetryFieldKey{Name: "service"}, Value: "test"},
},
Values: []*TimeSeriesValue{
{Timestamp: 1672563720000, Value: 1, Partial: true}, // 12:02
{Timestamp: 1672563900000, Value: 2}, // 12:05
{Timestamp: 1672564200000, Value: 2.5}, // 12:10
{Timestamp: 1672564500000, Value: 2.6}, // 12:15
{Timestamp: 1672566600000, Value: 2.9}, // 12:50
{Timestamp: 1672566900000, Value: 3}, // 12:55
{Timestamp: 1672567080000, Value: 4, Partial: true}, // 12:58
},
},
},
},
},
}
value, err := json.Marshal(timeSeriesData)
if err != nil {
panic(err)
}
buckets[i] = &CachedBucket{
StartMs: startMs,
EndMs: endMs,
Type: RequestTypeTimeSeries,
Value: json.RawMessage(value),
Stats: ExecStats{
RowsScanned: uint64(i * 500),
BytesScanned: uint64(i * 10000),
DurationMS: uint64(i * 1000),
},
}
}
return buckets
}
func BenchmarkCachedData_JSONMarshal_10kbuckets(b *testing.B) {
buckets := createBuckets_TimeSeries(10000)
data := &CachedData{Buckets: buckets}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := json.Marshal(data)
assert.NoError(b, err)
}
}
func BenchmarkCachedData_Clone_10kbuckets(b *testing.B) {
buckets := createBuckets_TimeSeries(10000)
data := &CachedData{Buckets: buckets}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = data.Clone()
}
}