diff --git a/pkg/querier/bucket_cache.go b/pkg/querier/bucket_cache.go index 48db70ea54..8a272563e2 100644 --- a/pkg/querier/bucket_cache.go +++ b/pkg/querier/bucket_cache.go @@ -36,29 +36,6 @@ func NewBucketCache(settings factory.ProviderSettings, cache cache.Cache, cacheT } } -// cachedBucket represents a cached time bucket -type cachedBucket struct { - StartMs uint64 `json:"startMs"` - EndMs uint64 `json:"endMs"` - Type qbtypes.RequestType `json:"type"` - Value json.RawMessage `json:"value"` - Stats qbtypes.ExecStats `json:"stats"` -} - -// cachedData represents the full cached data for a query -type cachedData struct { - Buckets []*cachedBucket `json:"buckets"` - Warnings []string `json:"warnings"` -} - -func (c *cachedData) UnmarshalBinary(data []byte) error { - return json.Unmarshal(data, c) -} - -func (c *cachedData) MarshalBinary() ([]byte, error) { - return json.Marshal(c) -} - // GetMissRanges returns cached data and missing time ranges func (bc *bucketCache) GetMissRanges( ctx context.Context, @@ -78,7 +55,7 @@ func (bc *bucketCache) GetMissRanges( bc.logger.DebugContext(ctx, "cache key", "cache_key", cacheKey) // Try to get cached data - var data cachedData + var data qbtypes.CachedData err := bc.cache.Get(ctx, orgID, cacheKey, &data) if err != nil { if !errors.Ast(err, errors.TypeNotFound) { @@ -147,9 +124,9 @@ func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Que cacheKey := bc.generateCacheKey(q) // Get existing cached data - var existingData cachedData + var existingData qbtypes.CachedData if err := bc.cache.Get(ctx, orgID, cacheKey, &existingData); err != nil { - existingData = cachedData{} + existingData = qbtypes.CachedData{} } // Trim the result to exclude data within flux interval @@ -203,7 +180,7 @@ func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Que uniqueWarnings := bc.deduplicateWarnings(allWarnings) // Create updated cached data - updatedData := cachedData{ + updatedData := qbtypes.CachedData{ Buckets: mergedBuckets, Warnings: uniqueWarnings, } @@ -222,7 +199,7 @@ func (bc *bucketCache) generateCacheKey(q qbtypes.Query) string { } // findMissingRangesWithStep identifies time ranges not covered by cached buckets with step alignment -func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange { +func (bc *bucketCache) findMissingRangesWithStep(buckets []*qbtypes.CachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange { // When step is 0 or window is too small to be cached, use simple algorithm if stepMs == 0 || (startMs+stepMs) > endMs { return bc.findMissingRangesBasic(buckets, startMs, endMs) @@ -265,7 +242,7 @@ func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startM } if needsSort { - slices.SortFunc(buckets, func(a, b *cachedBucket) int { + slices.SortFunc(buckets, func(a, b *qbtypes.CachedBucket) int { if a.StartMs < b.StartMs { return -1 } @@ -339,7 +316,7 @@ func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startM } // findMissingRangesBasic is the simple algorithm without step alignment -func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange { +func (bc *bucketCache) findMissingRangesBasic(buckets []*qbtypes.CachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange { // Check if already sorted before sorting needsSort := false for i := 1; i < len(buckets); i++ { @@ -350,7 +327,7 @@ func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, } if needsSort { - slices.SortFunc(buckets, func(a, b *cachedBucket) int { + slices.SortFunc(buckets, func(a, b *qbtypes.CachedBucket) int { if a.StartMs < b.StartMs { return -1 } @@ -421,9 +398,9 @@ func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, } // filterRelevantBuckets returns buckets that overlap with the requested time range -func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, endMs uint64) []*cachedBucket { +func (bc *bucketCache) filterRelevantBuckets(buckets []*qbtypes.CachedBucket, startMs, endMs uint64) []*qbtypes.CachedBucket { // Pre-allocate with estimated capacity - relevant := make([]*cachedBucket, 0, len(buckets)) + relevant := make([]*qbtypes.CachedBucket, 0, len(buckets)) for _, bucket := range buckets { // Check if bucket overlaps with requested range @@ -433,7 +410,7 @@ func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, e } // Sort by start time - slices.SortFunc(relevant, func(a, b *cachedBucket) int { + slices.SortFunc(relevant, func(a, b *qbtypes.CachedBucket) int { if a.StartMs < b.StartMs { return -1 } @@ -447,7 +424,7 @@ func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, e } // mergeBuckets combines multiple cached buckets into a single result -func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket, warnings []string) *qbtypes.Result { +func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*qbtypes.CachedBucket, warnings []string) *qbtypes.Result { if len(buckets) == 0 { return &qbtypes.Result{} } @@ -480,7 +457,7 @@ func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket } // mergeTimeSeriesValues merges time series data from multiple buckets -func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cachedBucket) *qbtypes.TimeSeriesData { +func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*qbtypes.CachedBucket) *qbtypes.TimeSeriesData { // Estimate capacity based on bucket count estimatedSeries := len(buckets) * 10 @@ -631,7 +608,7 @@ func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFi } // resultToBuckets converts a query result into time-based buckets -func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*cachedBucket { +func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*qbtypes.CachedBucket { // Check if result is empty isEmpty, isFiltered := bc.isEmptyResult(result) @@ -652,7 +629,7 @@ func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Resu // Always create a bucket, even for empty filtered results // This ensures we don't re-query for data that doesn't exist - return []*cachedBucket{ + return []*qbtypes.CachedBucket{ { StartMs: startMs, EndMs: endMs, @@ -664,9 +641,9 @@ func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Resu } // mergeAndDeduplicateBuckets combines and deduplicates bucket lists -func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucket) []*cachedBucket { +func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*qbtypes.CachedBucket) []*qbtypes.CachedBucket { // Create a map to deduplicate by time range - bucketMap := make(map[string]*cachedBucket) + bucketMap := make(map[string]*qbtypes.CachedBucket) // Add existing buckets for _, bucket := range existing { @@ -681,13 +658,13 @@ func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucke } // Convert back to slice with pre-allocated capacity - result := make([]*cachedBucket, 0, len(bucketMap)) + result := make([]*qbtypes.CachedBucket, 0, len(bucketMap)) for _, bucket := range bucketMap { result = append(result, bucket) } // Sort by start time - slices.SortFunc(result, func(a, b *cachedBucket) int { + slices.SortFunc(result, func(a, b *qbtypes.CachedBucket) int { if a.StartMs < b.StartMs { return -1 } diff --git a/pkg/querier/bucket_cache_bench_test.go b/pkg/querier/bucket_cache_bench_test.go index 2070b9b452..27b32e243b 100644 --- a/pkg/querier/bucket_cache_bench_test.go +++ b/pkg/querier/bucket_cache_bench_test.go @@ -147,14 +147,14 @@ func BenchmarkBucketCache_MergeTimeSeriesValues(b *testing.B) { for _, tc := range testCases { b.Run(tc.name, func(b *testing.B) { // Create test buckets - buckets := make([]*cachedBucket, tc.numBuckets) + buckets := make([]*qbtypes.CachedBucket, tc.numBuckets) for i := 0; i < tc.numBuckets; i++ { startMs := uint64(i * 10000) endMs := uint64((i + 1) * 10000) result := createBenchmarkResultWithSeries(startMs, endMs, 1000, tc.numSeries, tc.numValues) valueBytes, _ := json.Marshal(result.Value) - buckets[i] = &cachedBucket{ + buckets[i] = &qbtypes.CachedBucket{ StartMs: startMs, EndMs: endMs, Type: qbtypes.RequestTypeTimeSeries, @@ -417,8 +417,8 @@ func createBenchmarkResultWithSeries(startMs, endMs uint64, _ uint64, numSeries, } // Helper function to create buckets with specific gap patterns -func createBucketsWithPattern(numBuckets int, pattern string) []*cachedBucket { - buckets := make([]*cachedBucket, 0, numBuckets) +func createBucketsWithPattern(numBuckets int, pattern string) []*qbtypes.CachedBucket { + buckets := make([]*qbtypes.CachedBucket, 0, numBuckets) for i := 0; i < numBuckets; i++ { // Skip some buckets based on pattern @@ -432,7 +432,7 @@ func createBucketsWithPattern(numBuckets int, pattern string) []*cachedBucket { startMs := uint64(i * 10000) endMs := uint64((i + 1) * 10000) - buckets = append(buckets, &cachedBucket{ + buckets = append(buckets, &qbtypes.CachedBucket{ StartMs: startMs, EndMs: endMs, Type: qbtypes.RequestTypeTimeSeries, diff --git a/pkg/querier/bucket_cache_test.go b/pkg/querier/bucket_cache_test.go index e86ba96eb8..0b9280397f 100644 --- a/pkg/querier/bucket_cache_test.go +++ b/pkg/querier/bucket_cache_test.go @@ -521,7 +521,7 @@ func TestBucketCache_FindMissingRanges_EdgeCases(t *testing.T) { bc := NewBucketCache(instrumentationtest.New().ToProviderSettings(), memCache, cacheTTL, defaultFluxInterval).(*bucketCache) // Test with buckets that have gaps and overlaps - buckets := []*cachedBucket{ + buckets := []*qbtypes.CachedBucket{ {StartMs: 1000, EndMs: 2000}, {StartMs: 2500, EndMs: 3500}, {StartMs: 3000, EndMs: 4000}, // Overlaps with previous @@ -1097,7 +1097,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) { tests := []struct { name string - buckets []*cachedBucket + buckets []*qbtypes.CachedBucket startMs uint64 endMs uint64 stepMs uint64 @@ -1106,7 +1106,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) { }{ { name: "start_not_aligned_to_step", - buckets: []*cachedBucket{}, + buckets: []*qbtypes.CachedBucket{}, startMs: 1500, // Not aligned to 1000ms step endMs: 5000, stepMs: 1000, @@ -1118,7 +1118,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) { }, { name: "end_not_aligned_to_step", - buckets: []*cachedBucket{}, + buckets: []*qbtypes.CachedBucket{}, startMs: 1000, endMs: 4500, // Not aligned to 1000ms step stepMs: 1000, @@ -1129,7 +1129,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) { }, { name: "bucket_boundaries_not_aligned", - buckets: []*cachedBucket{ + buckets: []*qbtypes.CachedBucket{ {StartMs: 1500, EndMs: 2500}, // Not aligned }, startMs: 1000, @@ -1143,7 +1143,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) { }, { name: "small_window_less_than_step", - buckets: []*cachedBucket{}, + buckets: []*qbtypes.CachedBucket{}, startMs: 1000, endMs: 1500, // Less than one step stepMs: 1000, @@ -1154,7 +1154,7 @@ func TestBucketCache_FindMissingRangesWithStep(t *testing.T) { }, { name: "zero_step_uses_basic_algorithm", - buckets: []*cachedBucket{}, + buckets: []*qbtypes.CachedBucket{}, startMs: 1000, endMs: 5000, stepMs: 0, diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/cached.go b/pkg/types/querybuildertypes/querybuildertypesv5/cached.go new file mode 100644 index 0000000000..385521d8f5 --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/cached.go @@ -0,0 +1,61 @@ +package querybuildertypesv5 + +import ( + "bytes" + "encoding/json" + "maps" + + "github.com/SigNoz/signoz/pkg/types/cachetypes" +) + +var _ cachetypes.Cacheable = (*CachedData)(nil) + +type CachedBucket struct { + StartMs uint64 `json:"startMs"` + EndMs uint64 `json:"endMs"` + Type RequestType `json:"type"` + Value json.RawMessage `json:"value"` + Stats ExecStats `json:"stats"` +} + +func (c *CachedBucket) Clone() *CachedBucket { + return &CachedBucket{ + StartMs: c.StartMs, + EndMs: c.EndMs, + Type: c.Type, + Value: bytes.Clone(c.Value), + Stats: ExecStats{ + RowsScanned: c.Stats.RowsScanned, + BytesScanned: c.Stats.BytesScanned, + DurationMS: c.Stats.DurationMS, + StepIntervals: maps.Clone(c.Stats.StepIntervals), + }, + } +} + +// CachedData represents the full cached data for a query +type CachedData struct { + Buckets []*CachedBucket `json:"buckets"` + Warnings []string `json:"warnings"` +} + +func (c *CachedData) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, c) +} + +func (c *CachedData) MarshalBinary() ([]byte, error) { + return json.Marshal(c) +} + +func (c *CachedData) Clone() cachetypes.Cacheable { + clonedCachedData := new(CachedData) + clonedCachedData.Buckets = make([]*CachedBucket, len(c.Buckets)) + for i := range c.Buckets { + clonedCachedData.Buckets[i] = c.Buckets[i].Clone() + } + + clonedCachedData.Warnings = make([]string, len(c.Warnings)) + copy(clonedCachedData.Warnings, c.Warnings) + + return clonedCachedData +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/cached_bench_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/cached_bench_test.go new file mode 100644 index 0000000000..a919a9a529 --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/cached_bench_test.go @@ -0,0 +1,87 @@ +package querybuildertypesv5 + +import ( + "encoding/json" + "testing" + + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/stretchr/testify/assert" +) + +func createBuckets_TimeSeries(numBuckets int) []*CachedBucket { + buckets := make([]*CachedBucket, numBuckets) + + for i := 0; i < numBuckets; i++ { + startMs := uint64(i * 10000) + endMs := uint64((i + 1) * 10000) + + timeSeriesData := &TimeSeriesData{ + QueryName: "A", + Aggregations: []*AggregationBucket{ + { + Index: 0, + Series: []*TimeSeries{ + { + Labels: []*Label{ + {Key: telemetrytypes.TelemetryFieldKey{Name: "service"}, Value: "test"}, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1672563720000, Value: 1, Partial: true}, // 12:02 + {Timestamp: 1672563900000, Value: 2}, // 12:05 + {Timestamp: 1672564200000, Value: 2.5}, // 12:10 + {Timestamp: 1672564500000, Value: 2.6}, // 12:15 + {Timestamp: 1672566600000, Value: 2.9}, // 12:50 + {Timestamp: 1672566900000, Value: 3}, // 12:55 + {Timestamp: 1672567080000, Value: 4, Partial: true}, // 12:58 + }, + }, + }, + }, + }, + } + + value, err := json.Marshal(timeSeriesData) + if err != nil { + panic(err) + } + + buckets[i] = &CachedBucket{ + StartMs: startMs, + EndMs: endMs, + Type: RequestTypeTimeSeries, + Value: json.RawMessage(value), + Stats: ExecStats{ + RowsScanned: uint64(i * 500), + BytesScanned: uint64(i * 10000), + DurationMS: uint64(i * 1000), + }, + } + } + + return buckets +} + +func BenchmarkCachedData_JSONMarshal_10kbuckets(b *testing.B) { + buckets := createBuckets_TimeSeries(10000) + data := &CachedData{Buckets: buckets} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := json.Marshal(data) + assert.NoError(b, err) + } +} + +func BenchmarkCachedData_Clone_10kbuckets(b *testing.B) { + buckets := createBuckets_TimeSeries(10000) + data := &CachedData{Buckets: buckets} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = data.Clone() + } +}