chore: add querier HTTP API endpoint and bucket cache implementation (#8178)

* chore: update types
1. add partial bool to indicate if the value covers the partial interval
2. add optional unit if present (ex: duration_nano, metrics with units)
3. use pointers wherever necessary
4. add format options for request and remove redundant name in query envelope

* chore: fix some gaps
1. make the range as [start, end)
2. provide the logs statement builder with the body column
3. skip the body filter on resource filter statement builder
4. remove unnecessary agg expr rewriter in metrics
5. add ability to skip full text in where clause visitor

* chore: add API endpoint for new query range

* chore: add bucket cache implementation

* chore: add fingerprinting impl and add bucket cache to querier

* chore: add provider factory
This commit is contained in:
Srikanth Chekuri
2025-06-10 18:26:28 +05:30
committed by GitHub
parent 53f9e7d811
commit 85f04e4bae
37 changed files with 3814 additions and 171 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/apis/fields"
"github.com/SigNoz/signoz/pkg/http/middleware"
querierAPI "github.com/SigNoz/signoz/pkg/querier"
baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
@@ -58,8 +59,9 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
FluxInterval: opts.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
LicensingAPI: httplicensing.NewLicensingAPI(signoz.Licensing),
FieldsAPI: fields.NewAPI(signoz.TelemetryStore, signoz.Instrumentation.Logger()),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Querier),
})
if err != nil {

View File

@@ -294,6 +294,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
apiHandler.RegisterQueryRangeV3Routes(r, am)
apiHandler.RegisterInfraMetricsRoutes(r, am)
apiHandler.RegisterQueryRangeV4Routes(r, am)
apiHandler.RegisterQueryRangeV5Routes(r, am)
apiHandler.RegisterWebSocketPaths(r, am)
apiHandler.RegisterMessagingQueuesRoutes(r, am)
apiHandler.RegisterThirdPartyApiRoutes(r, am)

View File

@@ -3,9 +3,9 @@ package fields
import (
"bytes"
"io"
"log/slog"
"net/http"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
@@ -20,9 +20,13 @@ type API struct {
telemetryMetadataStore telemetrytypes.MetadataStore
}
func NewAPI(telemetryStore telemetrystore.TelemetryStore, logger *slog.Logger) *API {
// TODO: move this to module and remove metastore init
func NewAPI(
settings factory.ProviderSettings,
telemetryStore telemetrystore.TelemetryStore,
) *API {
telemetryMetadataStore := telemetrymetadata.NewTelemetryMetaStore(
logger,
settings,
telemetryStore,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,

49
pkg/querier/api.go Normal file
View File

@@ -0,0 +1,49 @@
package querier
import (
"encoding/json"
"net/http"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
type API struct {
querier Querier
}
func NewAPI(querier Querier) *API {
return &API{querier: querier}
}
func (a *API) QueryRange(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
var queryRangeRequest qbtypes.QueryRangeRequest
if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
queryRangeResponse, err := a.querier.QueryRange(ctx, orgID, &queryRangeRequest)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, queryRangeResponse)
}

818
pkg/querier/bucket_cache.go Normal file
View File

@@ -0,0 +1,818 @@
package querier
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"slices"
"time"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
// bucketCache implements the BucketCache interface
type bucketCache struct {
cache cache.Cache
logger *slog.Logger
cacheTTL time.Duration
fluxInterval time.Duration
}
var _ BucketCache = (*bucketCache)(nil)
// NewBucketCache creates a new BucketCache implementation
func NewBucketCache(settings factory.ProviderSettings, cache cache.Cache, cacheTTL time.Duration, fluxInterval time.Duration) BucketCache {
cacheSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier/bucket_cache")
return &bucketCache{
cache: cache,
logger: cacheSettings.Logger(),
cacheTTL: cacheTTL,
fluxInterval: fluxInterval,
}
}
// cachedBucket represents a cached time bucket
type cachedBucket struct {
StartMs uint64 `json:"startMs"`
EndMs uint64 `json:"endMs"`
Type qbtypes.RequestType `json:"type"`
Value json.RawMessage `json:"value"`
Stats qbtypes.ExecStats `json:"stats"`
}
// cachedData represents the full cached data for a query
type cachedData struct {
Buckets []*cachedBucket `json:"buckets"`
Warnings []string `json:"warnings"`
}
func (c *cachedData) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}
func (c *cachedData) MarshalBinary() ([]byte, error) {
return json.Marshal(c)
}
// GetMissRanges returns cached data and missing time ranges
func (bc *bucketCache) GetMissRanges(
ctx context.Context,
orgID valuer.UUID,
q qbtypes.Query,
step qbtypes.Step,
) (cached *qbtypes.Result, missing []*qbtypes.TimeRange) {
// Get query window
startMs, endMs := q.Window()
bc.logger.DebugContext(ctx, "getting miss ranges", "fingerprint", q.Fingerprint(), "start", startMs, "end", endMs)
// Generate cache key
cacheKey := bc.generateCacheKey(q)
bc.logger.DebugContext(ctx, "cache key", "cache_key", cacheKey)
// Try to get cached data
var data cachedData
err := bc.cache.Get(ctx, orgID, cacheKey, &data, false)
if err != nil {
if !errors.Ast(err, errors.TypeNotFound) {
bc.logger.ErrorContext(ctx, "error getting cached data", "error", err)
}
// No cached data, need to fetch entire range
missing = []*qbtypes.TimeRange{{From: startMs, To: endMs}}
return nil, missing
}
// Extract step interval if this is a builder query
stepMs := uint64(step.Duration.Milliseconds())
// Find missing ranges with step alignment
missing = bc.findMissingRangesWithStep(data.Buckets, startMs, endMs, stepMs)
bc.logger.DebugContext(ctx, "missing ranges", "missing", missing, "step", stepMs)
// If no cached data overlaps with requested range, return empty result
if len(data.Buckets) == 0 {
return nil, missing
}
// Extract relevant buckets and merge them
relevantBuckets := bc.filterRelevantBuckets(data.Buckets, startMs, endMs)
if len(relevantBuckets) == 0 {
return nil, missing
}
// Merge buckets into a single result
mergedResult := bc.mergeBuckets(ctx, relevantBuckets, data.Warnings)
// Filter the merged result to only include values within the requested time range
mergedResult = bc.filterResultToTimeRange(mergedResult, startMs, endMs)
return mergedResult, missing
}
// Put stores fresh query results in the cache
func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result) {
// Get query window
startMs, endMs := q.Window()
// Calculate the flux boundary - data after this point should not be cached
currentMs := uint64(time.Now().UnixMilli())
fluxBoundary := currentMs - uint64(bc.fluxInterval.Milliseconds())
// If the entire range is within flux interval, skip caching
if startMs >= fluxBoundary {
bc.logger.DebugContext(ctx, "entire range within flux interval, skipping cache",
"start", startMs,
"end", endMs,
"flux_boundary", fluxBoundary)
return
}
// Adjust endMs to not include data within flux interval
cachableEndMs := endMs
if endMs > fluxBoundary {
cachableEndMs = fluxBoundary
bc.logger.DebugContext(ctx, "adjusting end time to exclude flux interval",
"original_end", endMs,
"cachable_end", cachableEndMs)
}
// Generate cache key
cacheKey := bc.generateCacheKey(q)
// Get existing cached data
var existingData cachedData
if err := bc.cache.Get(ctx, orgID, cacheKey, &existingData, true); err != nil {
existingData = cachedData{}
}
// Trim the result to exclude data within flux interval
trimmedResult := bc.trimResultToFluxBoundary(fresh, cachableEndMs)
if trimmedResult == nil {
// Result type is not cacheable (raw or scalar)
return
}
// Convert trimmed result to buckets
freshBuckets := bc.resultToBuckets(ctx, trimmedResult, startMs, cachableEndMs)
// If no fresh buckets and no existing data, don't cache
if len(freshBuckets) == 0 && len(existingData.Buckets) == 0 {
return
}
// Merge with existing buckets
mergedBuckets := bc.mergeAndDeduplicateBuckets(existingData.Buckets, freshBuckets)
// Update warnings
allWarnings := append(existingData.Warnings, trimmedResult.Warnings...)
uniqueWarnings := bc.deduplicateWarnings(allWarnings)
// Create updated cached data
updatedData := cachedData{
Buckets: mergedBuckets,
Warnings: uniqueWarnings,
}
// Marshal and store in cache
if err := bc.cache.Set(ctx, orgID, cacheKey, &updatedData, bc.cacheTTL); err != nil {
bc.logger.ErrorContext(ctx, "error setting cached data", "error", err)
}
}
// generateCacheKey creates a unique cache key based on query fingerprint
func (bc *bucketCache) generateCacheKey(q qbtypes.Query) string {
fingerprint := q.Fingerprint()
return fmt.Sprintf("v5:query:%s", fingerprint)
}
// findMissingRangesWithStep identifies time ranges not covered by cached buckets with step alignment
func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange {
// When step is 0 or window is too small to be cached, use simple algorithm
if stepMs == 0 || (startMs+stepMs) > endMs {
return bc.findMissingRangesBasic(buckets, startMs, endMs)
}
// When no buckets exist, handle partial windows specially
if len(buckets) == 0 {
missing := make([]*qbtypes.TimeRange, 0, 3)
currentMs := startMs
// Check if start is not aligned - add partial window
if startMs%stepMs != 0 {
nextAggStart := startMs - (startMs % stepMs) + stepMs
missing = append(missing, &qbtypes.TimeRange{
From: startMs,
To: min(nextAggStart, endMs),
})
currentMs = nextAggStart
}
// Add the main range if needed
if currentMs < endMs {
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: endMs,
})
}
return missing
}
// Check if already sorted before sorting
needsSort := false
for i := 1; i < len(buckets); i++ {
if buckets[i].StartMs < buckets[i-1].StartMs {
needsSort = true
break
}
}
if needsSort {
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
if a.StartMs > b.StartMs {
return 1
}
return 0
})
}
// Pre-allocate with reasonable capacity
missing := make([]*qbtypes.TimeRange, 0, len(buckets)+2)
currentMs := startMs
// Check if start is not aligned - add partial window
if startMs%stepMs != 0 {
nextAggStart := startMs - (startMs % stepMs) + stepMs
missing = append(missing, &qbtypes.TimeRange{
From: startMs,
To: min(nextAggStart, endMs),
})
currentMs = nextAggStart
}
for _, bucket := range buckets {
// Skip buckets that end before current position
if bucket.EndMs <= currentMs {
continue
}
// Stop processing if we've reached the end time
if bucket.StartMs >= endMs {
break
}
// Align bucket boundaries to step intervals
alignedBucketStart := bucket.StartMs
if bucket.StartMs%stepMs != 0 {
// Round up to next step boundary
alignedBucketStart = bucket.StartMs - (bucket.StartMs % stepMs) + stepMs
}
// Add gap before this bucket if needed
if currentMs < alignedBucketStart && currentMs < endMs {
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: min(alignedBucketStart, endMs),
})
}
// Update current position to the end of this bucket
// But ensure it's aligned to step boundary
bucketEnd := min(bucket.EndMs, endMs)
if bucketEnd%stepMs != 0 && bucketEnd < endMs {
// Round down to step boundary
bucketEnd = bucketEnd - (bucketEnd % stepMs)
}
currentMs = max(currentMs, bucketEnd)
}
// Add final gap if needed
if currentMs < endMs {
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: endMs,
})
}
// Don't merge ranges - keep partial windows separate for proper handling
return missing
}
// findMissingRangesBasic is the simple algorithm without step alignment
func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange {
// Check if already sorted before sorting
needsSort := false
for i := 1; i < len(buckets); i++ {
if buckets[i].StartMs < buckets[i-1].StartMs {
needsSort = true
break
}
}
if needsSort {
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
if a.StartMs > b.StartMs {
return 1
}
return 0
})
}
// Pre-allocate with reasonable capacity
missing := make([]*qbtypes.TimeRange, 0, len(buckets)+1)
currentMs := startMs
for _, bucket := range buckets {
// Skip buckets that end before start time
if bucket.EndMs <= startMs {
continue
}
// Stop processing if we've reached the end time
if bucket.StartMs >= endMs {
break
}
// Add gap before this bucket if needed
if currentMs < bucket.StartMs {
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: min(bucket.StartMs, endMs),
})
}
// Update current position, but don't go past the end time
currentMs = max(currentMs, min(bucket.EndMs, endMs))
}
// Add final gap if needed
if currentMs < endMs {
// Check if we need to limit due to flux interval
currentTime := uint64(time.Now().UnixMilli())
fluxBoundary := currentTime - uint64(bc.fluxInterval.Milliseconds())
// If the missing range extends beyond flux boundary, limit it
if currentMs < fluxBoundary {
// Add range up to flux boundary
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: min(endMs, fluxBoundary),
})
// If endMs is beyond flux boundary, add that as another missing range
if endMs > fluxBoundary {
missing = append(missing, &qbtypes.TimeRange{
From: fluxBoundary,
To: endMs,
})
}
} else {
// Entire missing range is within flux interval
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: endMs,
})
}
}
// Don't merge ranges - keep partial windows separate for proper handling
return missing
}
// filterRelevantBuckets returns buckets that overlap with the requested time range
func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, endMs uint64) []*cachedBucket {
// Pre-allocate with estimated capacity
relevant := make([]*cachedBucket, 0, len(buckets))
for _, bucket := range buckets {
// Check if bucket overlaps with requested range
if bucket.EndMs > startMs && bucket.StartMs < endMs {
relevant = append(relevant, bucket)
}
}
// Sort by start time
slices.SortFunc(relevant, func(a, b *cachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
if a.StartMs > b.StartMs {
return 1
}
return 0
})
return relevant
}
// mergeBuckets combines multiple cached buckets into a single result
func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket, warnings []string) *qbtypes.Result {
if len(buckets) == 0 {
return &qbtypes.Result{}
}
// All buckets should have the same type
resultType := buckets[0].Type
// Aggregate stats
var totalStats qbtypes.ExecStats
for _, bucket := range buckets {
totalStats.RowsScanned += bucket.Stats.RowsScanned
totalStats.BytesScanned += bucket.Stats.BytesScanned
totalStats.DurationMS += bucket.Stats.DurationMS
}
// Merge values based on type
var mergedValue any
switch resultType {
case qbtypes.RequestTypeTimeSeries:
mergedValue = bc.mergeTimeSeriesValues(ctx, buckets)
// Raw and Scalar types are not cached, so no merge needed
}
return &qbtypes.Result{
Type: resultType,
Value: mergedValue,
Stats: totalStats,
Warnings: warnings,
}
}
// mergeTimeSeriesValues merges time series data from multiple buckets
func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cachedBucket) *qbtypes.TimeSeriesData {
// Estimate capacity based on bucket count
estimatedSeries := len(buckets) * 10
// Flat map with composite key for better performance
type seriesKey struct {
aggIndex int
key string
}
seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries)
var queryName string
for _, bucket := range buckets {
var tsData *qbtypes.TimeSeriesData
if err := json.Unmarshal(bucket.Value, &tsData); err != nil {
bc.logger.ErrorContext(ctx, "failed to unmarshal time series data", "error", err)
continue
}
// Preserve the query name from the first bucket
if queryName == "" && tsData.QueryName != "" {
queryName = tsData.QueryName
}
for _, aggBucket := range tsData.Aggregations {
for _, series := range aggBucket.Series {
// Create series key from labels
key := seriesKey{
aggIndex: aggBucket.Index,
key: qbtypes.GetUniqueSeriesKey(series.Labels),
}
if existingSeries, ok := seriesMap[key]; ok {
// Pre-allocate capacity for merged values
newCap := len(existingSeries.Values) + len(series.Values)
if cap(existingSeries.Values) < newCap {
newValues := make([]*qbtypes.TimeSeriesValue, len(existingSeries.Values), newCap)
copy(newValues, existingSeries.Values)
existingSeries.Values = newValues
}
existingSeries.Values = append(existingSeries.Values, series.Values...)
} else {
// New series
seriesMap[key] = series
}
}
}
}
// Group series by aggregation index
aggMap := make(map[int][]*qbtypes.TimeSeries)
for key, series := range seriesMap {
aggMap[key.aggIndex] = append(aggMap[key.aggIndex], series)
}
// Convert map back to slice
result := &qbtypes.TimeSeriesData{
QueryName: queryName,
Aggregations: make([]*qbtypes.AggregationBucket, 0, len(aggMap)),
}
for index, seriesList := range aggMap {
// Sort values by timestamp for each series
for _, s := range seriesList {
// Check if already sorted before sorting
needsSort := false
for i := 1; i < len(s.Values); i++ {
if s.Values[i].Timestamp < s.Values[i-1].Timestamp {
needsSort = true
break
}
}
if needsSort {
slices.SortFunc(s.Values, func(a, b *qbtypes.TimeSeriesValue) int {
if a.Timestamp < b.Timestamp {
return -1
}
if a.Timestamp > b.Timestamp {
return 1
}
return 0
})
}
}
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
Index: index,
Series: seriesList,
})
}
return result
}
// isEmptyResult checks if a result is truly empty (no data exists) vs filtered empty (data was filtered out)
func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFiltered bool) {
if result.Value == nil {
return true, false
}
switch result.Type {
case qbtypes.RequestTypeTimeSeries:
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
// No aggregations at all means truly empty
if len(tsData.Aggregations) == 0 {
return true, false
}
// Check if we have aggregations but no series (filtered out)
totalSeries := 0
for _, agg := range tsData.Aggregations {
totalSeries += len(agg.Series)
}
if totalSeries == 0 {
// We have aggregations but no series - data was filtered out
return true, true
}
// Check if all series have no values
hasValues := false
for _, agg := range tsData.Aggregations {
for _, series := range agg.Series {
if len(series.Values) > 0 {
hasValues = true
break
}
}
if hasValues {
break
}
}
return !hasValues, !hasValues && totalSeries > 0
}
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
// Raw and scalar data are not cached
return true, false
}
return true, false
}
// resultToBuckets converts a query result into time-based buckets
func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*cachedBucket {
// Check if result is empty
isEmpty, isFiltered := bc.isEmptyResult(result)
// Don't cache if result is empty but not filtered
// Empty filtered results should be cached to avoid re-querying
if isEmpty && !isFiltered {
bc.logger.DebugContext(ctx, "skipping cache for empty non-filtered result")
return nil
}
// For now, create a single bucket for the entire range
// In the future, we could split large ranges into smaller buckets
valueBytes, err := json.Marshal(result.Value)
if err != nil {
bc.logger.ErrorContext(ctx, "failed to marshal result value", "error", err)
return nil
}
// Always create a bucket, even for empty filtered results
// This ensures we don't re-query for data that doesn't exist
return []*cachedBucket{
{
StartMs: startMs,
EndMs: endMs,
Type: result.Type,
Value: valueBytes,
Stats: result.Stats,
},
}
}
// mergeAndDeduplicateBuckets combines and deduplicates bucket lists
func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucket) []*cachedBucket {
// Create a map to deduplicate by time range
bucketMap := make(map[string]*cachedBucket)
// Add existing buckets
for _, bucket := range existing {
key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs)
bucketMap[key] = bucket
}
// Add/update with fresh buckets
for _, bucket := range fresh {
key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs)
bucketMap[key] = bucket
}
// Convert back to slice with pre-allocated capacity
result := make([]*cachedBucket, 0, len(bucketMap))
for _, bucket := range bucketMap {
result = append(result, bucket)
}
// Sort by start time
slices.SortFunc(result, func(a, b *cachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
if a.StartMs > b.StartMs {
return 1
}
return 0
})
return result
}
// deduplicateWarnings removes duplicate warnings
func (bc *bucketCache) deduplicateWarnings(warnings []string) []string {
if len(warnings) == 0 {
return nil
}
seen := make(map[string]bool, len(warnings))
unique := make([]string, 0, len(warnings)) // Pre-allocate capacity
for _, warning := range warnings {
if !seen[warning] {
seen[warning] = true
unique = append(unique, warning)
}
}
return unique
}
// trimResultToFluxBoundary trims the result to exclude data points beyond the flux boundary
func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoundary uint64) *qbtypes.Result {
trimmedResult := &qbtypes.Result{
Type: result.Type,
Stats: result.Stats,
Warnings: result.Warnings,
}
switch result.Type {
case qbtypes.RequestTypeTimeSeries:
// Trim time series data
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
trimmedData := &qbtypes.TimeSeriesData{
QueryName: tsData.QueryName,
}
for _, aggBucket := range tsData.Aggregations {
trimmedBucket := &qbtypes.AggregationBucket{
Index: aggBucket.Index,
}
for _, series := range aggBucket.Series {
trimmedSeries := &qbtypes.TimeSeries{
Labels: series.Labels,
}
// Filter values to exclude those beyond flux boundary and partial values
for _, value := range series.Values {
// Skip partial values - they cannot be cached
if value.Partial {
continue
}
if uint64(value.Timestamp) <= fluxBoundary {
trimmedSeries.Values = append(trimmedSeries.Values, value)
}
}
// Always add the series to preserve filtered empty results
trimmedBucket.Series = append(trimmedBucket.Series, trimmedSeries)
}
// Always add the bucket to preserve aggregation structure
trimmedData.Aggregations = append(trimmedData.Aggregations, trimmedBucket)
}
// Always set the value to preserve empty filtered results
trimmedResult.Value = trimmedData
}
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
// Don't cache raw or scalar data
return nil
}
return trimmedResult
}
func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}
func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}
// filterResultToTimeRange filters the result to only include values within the requested time range
func (bc *bucketCache) filterResultToTimeRange(result *qbtypes.Result, startMs, endMs uint64) *qbtypes.Result {
if result == nil || result.Value == nil {
return result
}
switch result.Type {
case qbtypes.RequestTypeTimeSeries:
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
filteredData := &qbtypes.TimeSeriesData{
QueryName: tsData.QueryName,
Aggregations: make([]*qbtypes.AggregationBucket, 0, len(tsData.Aggregations)),
}
for _, aggBucket := range tsData.Aggregations {
filteredBucket := &qbtypes.AggregationBucket{
Index: aggBucket.Index,
Alias: aggBucket.Alias,
Meta: aggBucket.Meta,
Series: make([]*qbtypes.TimeSeries, 0, len(aggBucket.Series)),
}
for _, series := range aggBucket.Series {
filteredSeries := &qbtypes.TimeSeries{
Labels: series.Labels,
Values: make([]*qbtypes.TimeSeriesValue, 0, len(series.Values)),
}
// Filter values to only include those within the requested time range
for _, value := range series.Values {
timestampMs := uint64(value.Timestamp)
if timestampMs >= startMs && timestampMs < endMs {
filteredSeries.Values = append(filteredSeries.Values, value)
}
}
// Always add series to preserve structure (even if empty)
filteredBucket.Series = append(filteredBucket.Series, filteredSeries)
}
// Only add bucket if it has series
if len(filteredBucket.Series) > 0 {
filteredData.Aggregations = append(filteredData.Aggregations, filteredBucket)
}
}
// Create a new result with the filtered data
return &qbtypes.Result{
Type: result.Type,
Value: filteredData,
Stats: result.Stats,
Warnings: result.Warnings,
}
}
}
// For non-time series data, return as is
return result
}

View File

@@ -0,0 +1,445 @@
package querier
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/cachetest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
// BenchmarkBucketCache_GetMissRanges benchmarks the GetMissRanges operation
func BenchmarkBucketCache_GetMissRanges(b *testing.B) {
bc := createBenchmarkBucketCache(b)
ctx := context.Background()
orgID := valuer.UUID{}
// Pre-populate cache with some data
for i := 0; i < 10; i++ {
query := &mockQuery{
fingerprint: fmt.Sprintf("bench-query-%d", i),
startMs: uint64(i * 10000),
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
}
// Create test queries with varying cache hit patterns
queries := []struct {
name string
query *mockQuery
}{
{
name: "full_cache_hit",
query: &mockQuery{
fingerprint: "bench-query-5",
startMs: 50000,
endMs: 60000,
},
},
{
name: "full_cache_miss",
query: &mockQuery{
fingerprint: "bench-query-new",
startMs: 100000,
endMs: 110000,
},
},
{
name: "partial_cache_hit",
query: &mockQuery{
fingerprint: "bench-query-5",
startMs: 45000,
endMs: 65000,
},
},
}
for _, tc := range queries {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
cached, missing := bc.GetMissRanges(ctx, orgID, tc.query, qbtypes.Step{Duration: 1000 * time.Millisecond})
_ = cached
_ = missing
}
})
}
}
// BenchmarkBucketCache_Put benchmarks the Put operation
func BenchmarkBucketCache_Put(b *testing.B) {
bc := createBenchmarkBucketCache(b)
ctx := context.Background()
orgID := valuer.UUID{}
testCases := []struct {
name string
numSeries int
numValues int
numQueries int
}{
{"small_result_1_series_100_values", 1, 100, 1},
{"medium_result_10_series_100_values", 10, 100, 1},
{"large_result_100_series_100_values", 100, 100, 1},
{"huge_result_1000_series_100_values", 1000, 100, 1},
{"many_values_10_series_1000_values", 10, 1000, 1},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create test data
queries := make([]*mockQuery, tc.numQueries)
results := make([]*qbtypes.Result, tc.numQueries)
for i := 0; i < tc.numQueries; i++ {
queries[i] = &mockQuery{
fingerprint: fmt.Sprintf("bench-put-query-%d", i),
startMs: uint64(i * 100000),
endMs: uint64((i + 1) * 100000),
}
results[i] = createBenchmarkResultWithSeries(
queries[i].startMs,
queries[i].endMs,
1000,
tc.numSeries,
tc.numValues,
)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for j := 0; j < tc.numQueries; j++ {
bc.Put(ctx, orgID, queries[j], results[j])
}
}
})
}
}
// BenchmarkBucketCache_MergeTimeSeriesValues benchmarks merging of time series data
func BenchmarkBucketCache_MergeTimeSeriesValues(b *testing.B) {
bc := createBenchmarkBucketCache(b).(*bucketCache)
testCases := []struct {
name string
numBuckets int
numSeries int
numValues int
}{
{"small_2_buckets_10_series", 2, 10, 100},
{"medium_5_buckets_50_series", 5, 50, 100},
{"large_10_buckets_100_series", 10, 100, 100},
{"many_buckets_20_buckets_50_series", 20, 50, 100},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create test buckets
buckets := make([]*cachedBucket, tc.numBuckets)
for i := 0; i < tc.numBuckets; i++ {
startMs := uint64(i * 10000)
endMs := uint64((i + 1) * 10000)
result := createBenchmarkResultWithSeries(startMs, endMs, 1000, tc.numSeries, tc.numValues)
valueBytes, _ := json.Marshal(result.Value)
buckets[i] = &cachedBucket{
StartMs: startMs,
EndMs: endMs,
Type: qbtypes.RequestTypeTimeSeries,
Value: valueBytes,
Stats: result.Stats,
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
result := bc.mergeTimeSeriesValues(context.Background(), buckets)
_ = result
}
})
}
}
// BenchmarkBucketCache_FindMissingRangesWithStep benchmarks finding missing ranges
func BenchmarkBucketCache_FindMissingRangesWithStep(b *testing.B) {
bc := createBenchmarkBucketCache(b).(*bucketCache)
testCases := []struct {
name string
numBuckets int
gapPattern string // "none", "uniform", "random"
}{
{"no_gaps_10_buckets", 10, "none"},
{"uniform_gaps_10_buckets", 10, "uniform"},
{"random_gaps_20_buckets", 20, "random"},
{"many_buckets_100", 100, "uniform"},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create test buckets based on pattern
buckets := createBucketsWithPattern(tc.numBuckets, tc.gapPattern)
startMs := uint64(0)
endMs := uint64(tc.numBuckets * 20000)
stepMs := uint64(1000)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
missing := bc.findMissingRangesWithStep(buckets, startMs, endMs, stepMs)
_ = missing
}
})
}
}
// BenchmarkGetUniqueSeriesKey benchmarks the series key generation
func BenchmarkGetUniqueSeriesKey(b *testing.B) {
testCases := []struct {
name string
numLabels int
}{
{"1_label", 1},
{"5_labels", 5},
{"10_labels", 10},
{"20_labels", 20},
{"50_labels", 50},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
labels := make([]*qbtypes.Label, tc.numLabels)
for i := 0; i < tc.numLabels; i++ {
labels[i] = &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{
Name: fmt.Sprintf("label_%d", i),
FieldDataType: telemetrytypes.FieldDataTypeString,
},
Value: fmt.Sprintf("value_%d", i),
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
key := qbtypes.GetUniqueSeriesKey(labels)
_ = key
}
})
}
}
// BenchmarkBucketCache_ConcurrentOperations benchmarks concurrent cache operations
func BenchmarkBucketCache_ConcurrentOperations(b *testing.B) {
bc := createBenchmarkBucketCache(b)
ctx := context.Background()
orgID := valuer.UUID{}
// Pre-populate cache
for i := 0; i < 100; i++ {
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-%d", i),
startMs: uint64(i * 10000),
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
// Mix of operations
switch i % 3 {
case 0: // Read
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-%d", i%100),
startMs: uint64((i % 100) * 10000),
endMs: uint64(((i % 100) + 1) * 10000),
}
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
_ = cached
_ = missing
case 1: // Write
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-new-%d", i),
startMs: uint64(i * 10000),
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
case 2: // Partial read
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-%d", i%100),
startMs: uint64((i%100)*10000 - 5000),
endMs: uint64(((i%100)+1)*10000 + 5000),
}
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
_ = cached
_ = missing
}
i++
}
})
}
// BenchmarkBucketCache_FilterResultToTimeRange benchmarks filtering results to time range
func BenchmarkBucketCache_FilterResultToTimeRange(b *testing.B) {
bc := createBenchmarkBucketCache(b).(*bucketCache)
testCases := []struct {
name string
numSeries int
numValues int
}{
{"small_10_series_100_values", 10, 100},
{"medium_50_series_500_values", 50, 500},
{"large_100_series_1000_values", 100, 1000},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create a large result
result := createBenchmarkResultWithSeries(0, 100000, 1000, tc.numSeries, tc.numValues)
// Filter to middle 50%
startMs := uint64(25000)
endMs := uint64(75000)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
filtered := bc.filterResultToTimeRange(result, startMs, endMs)
_ = filtered
}
})
}
}
// Helper function to create benchmark bucket cache
func createBenchmarkBucketCache(tb testing.TB) BucketCache {
config := cache.Config{
Provider: "memory",
Memory: cache.Memory{
TTL: time.Hour * 168,
CleanupInterval: 10 * time.Minute,
},
}
memCache, err := cachetest.New(config)
require.NoError(tb, err)
return NewBucketCache(instrumentationtest.New().ToProviderSettings(), memCache, time.Hour, 5*time.Minute)
}
// Helper function to create benchmark result
func createBenchmarkResult(startMs, endMs uint64, step uint64) *qbtypes.Result {
return createBenchmarkResultWithSeries(startMs, endMs, step, 10, 100)
}
// Helper function to create benchmark result with specific series and values
func createBenchmarkResultWithSeries(startMs, endMs uint64, _ uint64, numSeries, numValuesPerSeries int) *qbtypes.Result {
series := make([]*qbtypes.TimeSeries, numSeries)
for i := 0; i < numSeries; i++ {
ts := &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
FieldDataType: telemetrytypes.FieldDataTypeString,
},
Value: fmt.Sprintf("server-%d", i),
},
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
FieldDataType: telemetrytypes.FieldDataTypeString,
},
Value: fmt.Sprintf("service-%d", i%5),
},
},
Values: make([]*qbtypes.TimeSeriesValue, 0, numValuesPerSeries),
}
// Generate values
valueStep := (endMs - startMs) / uint64(numValuesPerSeries)
if valueStep == 0 {
valueStep = 1
}
for j := 0; j < numValuesPerSeries; j++ {
timestamp := int64(startMs + uint64(j)*valueStep)
if timestamp < int64(endMs) {
ts.Values = append(ts.Values, &qbtypes.TimeSeriesValue{
Timestamp: timestamp,
Value: float64(i*100 + j),
})
}
}
series[i] = ts
}
return &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "benchmark_query",
Aggregations: []*qbtypes.AggregationBucket{
{
Index: 0,
Series: series,
},
},
},
Stats: qbtypes.ExecStats{
RowsScanned: uint64(numSeries * numValuesPerSeries),
BytesScanned: uint64(numSeries * numValuesPerSeries * 100),
DurationMS: 10,
},
}
}
// Helper function to create buckets with specific gap patterns
func createBucketsWithPattern(numBuckets int, pattern string) []*cachedBucket {
buckets := make([]*cachedBucket, 0, numBuckets)
for i := 0; i < numBuckets; i++ {
// Skip some buckets based on pattern
if pattern == "uniform" && i%3 == 0 {
continue // Create gaps every 3rd bucket
}
if pattern == "random" && i%7 < 2 {
continue // Create random gaps
}
startMs := uint64(i * 10000)
endMs := uint64((i + 1) * 10000)
buckets = append(buckets, &cachedBucket{
StartMs: startMs,
EndMs: endMs,
Type: qbtypes.RequestTypeTimeSeries,
Value: json.RawMessage(`{}`),
Stats: qbtypes.ExecStats{},
})
}
return buckets
}

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,12 @@ package querier
import (
"context"
"encoding/base64"
"fmt"
"strconv"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -42,8 +44,98 @@ func newBuilderQuery[T any](
}
func (q *builderQuery[T]) Fingerprint() string {
// TODO: implement this
return ""
if (q.spec.Signal == telemetrytypes.SignalTraces ||
q.spec.Signal == telemetrytypes.SignalLogs) && q.kind != qbtypes.RequestTypeTimeSeries {
// No caching for non-timeseries queries
return ""
}
// Create a deterministic fingerprint for builder queries
// This needs to include all fields that affect the query results
parts := []string{"builder"}
// Add signal type
parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue()))
// Add step interval if present
parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String()))
// Add aggregations (convert to string representation)
if len(q.spec.Aggregations) > 0 {
aggParts := []string{}
for _, agg := range q.spec.Aggregations {
switch a := any(agg).(type) {
case qbtypes.TraceAggregation:
aggParts = append(aggParts, a.Expression)
case qbtypes.LogAggregation:
aggParts = append(aggParts, a.Expression)
case qbtypes.MetricAggregation:
aggParts = append(aggParts, fmt.Sprintf("%s:%s:%s:%s",
a.MetricName,
a.Temporality.StringValue(),
a.TimeAggregation.StringValue(),
a.SpaceAggregation.StringValue(),
))
}
}
parts = append(parts, fmt.Sprintf("aggs=[%s]", strings.Join(aggParts, ",")))
}
// Add filter if present
if q.spec.Filter != nil && q.spec.Filter.Expression != "" {
parts = append(parts, fmt.Sprintf("filter=%s", q.spec.Filter.Expression))
}
// Add group by keys
if len(q.spec.GroupBy) > 0 {
groupByParts := []string{}
for _, gb := range q.spec.GroupBy {
groupByParts = append(groupByParts, fingerprintGroupByKey(gb))
}
parts = append(parts, fmt.Sprintf("groupby=[%s]", strings.Join(groupByParts, ",")))
}
// Add order by
if len(q.spec.Order) > 0 {
orderParts := []string{}
for _, o := range q.spec.Order {
orderParts = append(orderParts, fingerprintOrderBy(o))
}
parts = append(parts, fmt.Sprintf("order=[%s]", strings.Join(orderParts, ",")))
}
// Add limit and offset
if q.spec.Limit > 0 {
parts = append(parts, fmt.Sprintf("limit=%d", q.spec.Limit))
}
if q.spec.Offset > 0 {
parts = append(parts, fmt.Sprintf("offset=%d", q.spec.Offset))
}
// Add having clause
if q.spec.Having != nil && q.spec.Having.Expression != "" {
parts = append(parts, fmt.Sprintf("having=%s", q.spec.Having.Expression))
}
return strings.Join(parts, "&")
}
func fingerprintGroupByKey(gb qbtypes.GroupByKey) string {
return fingerprintFieldKey(gb.TelemetryFieldKey)
}
func fingerprintOrderBy(o qbtypes.OrderBy) string {
return fmt.Sprintf("%s:%s", fingerprintFieldKey(o.Key.TelemetryFieldKey), o.Direction.StringValue())
}
func fingerprintFieldKey(key telemetrytypes.TelemetryFieldKey) string {
// Include the essential fields that identify a field key
return fmt.Sprintf("%s-%s-%s-%s",
key.Name,
key.FieldDataType.StringValue(),
key.FieldContext.StringValue(),
key.Signal.StringValue())
}
func (q *builderQuery[T]) Window() (uint64, uint64) {
@@ -83,13 +175,8 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return nil, err
}
chQuery := qbtypes.ClickHouseQuery{
Name: q.spec.Name,
Query: stmt.Query,
}
chExec := newchSQLQuery(q.telemetryStore, chQuery, stmt.Args, qbtypes.TimeRange{From: q.fromMS, To: q.toMS}, q.kind)
result, err := chExec.Execute(ctx)
// Execute the query with proper context for partial value detection
result, err := q.executeWithContext(ctx, stmt.Query, stmt.Args)
if err != nil {
return nil, err
}
@@ -97,6 +184,42 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return result, nil
}
// executeWithContext executes the query with query window and step context for partial value detection
func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) {
totalRows := uint64(0)
totalBytes := uint64(0)
elapsed := time.Duration(0)
ctx = clickhouse.Context(ctx, clickhouse.WithProgress(func(p *clickhouse.Progress) {
totalRows += p.Rows
totalBytes += p.Bytes
elapsed += p.Elapsed
}))
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
// Pass query window and step for partial value detection
queryWindow := &qbtypes.TimeRange{From: q.fromMS, To: q.toMS}
payload, err := consume(rows, q.kind, queryWindow, q.spec.StepInterval, q.spec.Name)
if err != nil {
return nil, err
}
return &qbtypes.Result{
Type: q.kind,
Value: payload,
Stats: qbtypes.ExecStats{
RowsScanned: totalRows,
BytesScanned: totalBytes,
DurationMS: uint64(elapsed.Milliseconds()),
},
}, nil
}
func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Result, error) {
isAsc := len(q.spec.Order) > 0 &&
strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc"
@@ -138,14 +261,8 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
return nil, err
}
chExec := newchSQLQuery(
q.telemetryStore,
qbtypes.ClickHouseQuery{Name: q.spec.Name, Query: stmt.Query},
stmt.Args,
qbtypes.TimeRange{From: q.fromMS, To: q.toMS},
q.kind,
)
res, err := chExec.Execute(ctx)
// Execute with proper context for partial value detection
res, err := q.executeWithContext(ctx, stmt.Query, stmt.Args)
if err != nil {
return nil, err
}

View File

@@ -38,8 +38,11 @@ func newchSQLQuery(
}
}
// TODO: use the same query hash scheme as ClickHouse
func (q *chSQLQuery) Fingerprint() string { return q.query.Query }
func (q *chSQLQuery) Fingerprint() string {
// No caching for CH queries for now
return ""
}
func (q *chSQLQuery) Window() (uint64, uint64) { return q.fromMS, q.toMS }
func (q *chSQLQuery) Execute(ctx context.Context) (*qbtypes.Result, error) {
@@ -61,7 +64,7 @@ func (q *chSQLQuery) Execute(ctx context.Context) (*qbtypes.Result, error) {
defer rows.Close()
// TODO: map the errors from ClickHouse to our error types
payload, err := consume(rows, q.kind)
payload, err := consume(rows, q.kind, nil, qbtypes.Step{}, q.query.Name)
if err != nil {
return nil, err
}

50
pkg/querier/config.go Normal file
View File

@@ -0,0 +1,50 @@
package querier
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
// Config represents the configuration for the querier
type Config struct {
// CacheTTL is the TTL for cached query results
CacheTTL time.Duration `yaml:"cache_ttl" mapstructure:"cache_ttl"`
// FluxInterval is the interval for recent data that should not be cached
FluxInterval time.Duration `yaml:"flux_interval" mapstructure:"flux_interval"`
// MaxConcurrentQueries is the maximum number of concurrent queries for missing ranges
MaxConcurrentQueries int `yaml:"max_concurrent_queries" mapstructure:"max_concurrent_queries"`
}
// NewConfigFactory creates a new config factory for querier
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("querier"), newConfig)
}
func newConfig() factory.Config {
return Config{
// Default values
CacheTTL: 168 * time.Hour,
FluxInterval: 5 * time.Minute,
MaxConcurrentQueries: 4,
}
}
// Validate validates the configuration
func (c Config) Validate() error {
if c.CacheTTL <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "cache_ttl must be positive, got %v", c.CacheTTL)
}
if c.FluxInterval <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "flux_interval must be positive, got %v", c.FluxInterval)
}
if c.MaxConcurrentQueries <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "max_concurrent_queries must be positive, got %v", c.MaxConcurrentQueries)
}
return nil
}
func (c Config) Provider() string {
return "signoz"
}

View File

@@ -22,11 +22,11 @@ var (
// consume reads every row and shapes it into the payload expected for the
// given request type.
//
// * Time-series - []*qbtypes.TimeSeriesData
// * Scalar - []*qbtypes.ScalarData
// * Raw - []*qbtypes.RawData
// * Distribution- []*qbtypes.DistributionData
func consume(rows driver.Rows, kind qbtypes.RequestType) (any, error) {
// * Time-series - *qbtypes.TimeSeriesData
// * Scalar - *qbtypes.ScalarData
// * Raw - *qbtypes.RawData
// * Distribution- *qbtypes.DistributionData
func consume(rows driver.Rows, kind qbtypes.RequestType, queryWindow *qbtypes.TimeRange, step qbtypes.Step, queryName string) (any, error) {
var (
payload any
err error
@@ -34,18 +34,18 @@ func consume(rows driver.Rows, kind qbtypes.RequestType) (any, error) {
switch kind {
case qbtypes.RequestTypeTimeSeries:
payload, err = readAsTimeSeries(rows)
payload, err = readAsTimeSeries(rows, queryWindow, step, queryName)
case qbtypes.RequestTypeScalar:
payload, err = readAsScalar(rows)
payload, err = readAsScalar(rows, queryName)
case qbtypes.RequestTypeRaw:
payload, err = readAsRaw(rows)
payload, err = readAsRaw(rows, queryName)
// TODO: add support for other request types
}
return payload, err
}
func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) {
func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbtypes.Step, queryName string) (*qbtypes.TimeSeriesData, error) {
colTypes := rows.ColumnTypes()
colNames := rows.Columns()
@@ -65,6 +65,43 @@ func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) {
}
seriesMap := map[sKey]*qbtypes.TimeSeries{}
stepMs := uint64(step.Duration.Milliseconds())
// Helper function to check if a timestamp represents a partial value
isPartialValue := func(timestamp int64) bool {
if stepMs == 0 || queryWindow == nil {
return false
}
timestampMs := uint64(timestamp)
// For the first interval, check if query start is misaligned
// The first complete interval starts at the first timestamp >= queryWindow.From that is aligned to step
firstCompleteInterval := queryWindow.From
if queryWindow.From%stepMs != 0 {
// Round up to next step boundary
firstCompleteInterval = ((queryWindow.From / stepMs) + 1) * stepMs
}
// If timestamp is before the first complete interval, it's partial
if timestampMs < firstCompleteInterval {
return true
}
// For the last interval, check if it would extend beyond query end
if timestampMs+stepMs > queryWindow.To {
return queryWindow.To%stepMs != 0
}
return false
}
// Pre-allocate for labels based on column count
lblValsCapacity := len(colNames) - 1 // -1 for timestamp
if lblValsCapacity < 0 {
lblValsCapacity = 0
}
for rows.Next() {
if err := rows.Scan(slots...); err != nil {
return nil, err
@@ -72,8 +109,8 @@ func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) {
var (
ts int64
lblVals []string
lblObjs []*qbtypes.Label
lblVals = make([]string, 0, lblValsCapacity)
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
aggValues = map[int]float64{} // all __result_N in this row
fallbackValue float64 // value when NO __result_N columns exist
fallbackSeen bool
@@ -175,6 +212,7 @@ func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) {
series.Values = append(series.Values, &qbtypes.TimeSeriesValue{
Timestamp: ts,
Value: val,
Partial: isPartialValue(ts),
})
}
}
@@ -189,6 +227,7 @@ func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) {
}
}
if maxAgg < 0 {
//nolint:nilnil
return nil, nil // empty result-set
}
@@ -210,9 +249,10 @@ func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) {
}
}
return []*qbtypes.TimeSeriesData{{
return &qbtypes.TimeSeriesData{
QueryName: queryName,
Aggregations: nonEmpty,
}}, nil
}, nil
}
func numericKind(k reflect.Kind) bool {
@@ -226,12 +266,13 @@ func numericKind(k reflect.Kind) bool {
}
}
func readAsScalar(rows driver.Rows) (*qbtypes.ScalarData, error) {
func readAsScalar(rows driver.Rows, queryName string) (*qbtypes.ScalarData, error) {
colNames := rows.Columns()
colTypes := rows.ColumnTypes()
cd := make([]*qbtypes.ColumnDescriptor, len(colNames))
var aggIndex int64
for i, name := range colNames {
colType := qbtypes.ColumnTypeGroup
if aggRe.MatchString(name) {
@@ -239,18 +280,24 @@ func readAsScalar(rows driver.Rows) (*qbtypes.ScalarData, error) {
}
cd[i] = &qbtypes.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
AggregationIndex: int64(i),
QueryName: queryName,
AggregationIndex: aggIndex,
Type: colType,
}
if colType == qbtypes.ColumnTypeAggregation {
aggIndex++
}
}
// Pre-allocate scan slots once
scan := make([]any, len(colTypes))
for i := range scan {
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
}
var data [][]any
for rows.Next() {
scan := make([]any, len(colTypes))
for i := range scan {
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
}
if err := rows.Scan(scan...); err != nil {
return nil, err
}
@@ -277,7 +324,7 @@ func readAsScalar(rows driver.Rows) (*qbtypes.ScalarData, error) {
}, nil
}
func readAsRaw(rows driver.Rows) (*qbtypes.RawData, error) {
func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
colNames := rows.Columns()
colTypes := rows.ColumnTypes()
@@ -337,36 +384,38 @@ func readAsRaw(rows driver.Rows) (*qbtypes.RawData, error) {
}
return &qbtypes.RawData{
Rows: outRows,
QueryName: queryName,
Rows: outRows,
}, nil
}
// numericAsFloat converts numeric types to float64 efficiently
func numericAsFloat(v any) float64 {
switch x := v.(type) {
case float64:
return x
case float32:
return float64(x)
case int64:
return float64(x)
case float32:
return float64(x)
case int32:
return float64(x)
case int16:
return float64(x)
case int8:
return float64(x)
case int:
return float64(x)
case uint64:
return float64(x)
case uint32:
return float64(x)
case int:
return float64(x)
case uint:
return float64(x)
case int16:
return float64(x)
case int8:
return float64(x)
case uint16:
return float64(x)
case uint8:
return float64(x)
case uint:
return float64(x)
default:
return math.NaN()
}

21
pkg/querier/interfaces.go Normal file
View File

@@ -0,0 +1,21 @@
package querier
import (
"context"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Querier interface defines the contract for querying data
type Querier interface {
QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error)
}
// BucketCache is the interface for bucket-based caching
type BucketCache interface {
// cached portion + list of gaps to fetch
GetMissRanges(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step) (cached *qbtypes.Result, missing []*qbtypes.TimeRange)
// store fresh buckets for future hits
Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result)
}

98
pkg/querier/pools.go Normal file
View File

@@ -0,0 +1,98 @@
package querier
import (
"sync"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
// Pools for reducing allocations in hot paths
var (
// Pool for label slices
labelSlicePool = sync.Pool{
New: func() any {
s := make([]*qbtypes.Label, 0, 16)
return &s
},
}
// Pool for string slices used in label processing
stringSlicePool = sync.Pool{
New: func() any {
s := make([]string, 0, 16)
return &s
},
}
// Pool for time series value slices
valueSlicePool = sync.Pool{
New: func() any {
s := make([]*qbtypes.TimeSeriesValue, 0, 100)
return &s
},
}
// Pool for aggregation value maps
aggValueMapPool = sync.Pool{
New: func() any {
m := make(map[int]float64, 4)
return &m
},
}
)
// GetLabelSlice gets a label slice from the pool
func GetLabelSlice() []*qbtypes.Label {
sp := labelSlicePool.Get().(*[]*qbtypes.Label)
return *sp
}
// PutLabelSlice returns a label slice to the pool
func PutLabelSlice(s []*qbtypes.Label) {
s = s[:0] // Reset slice
labelSlicePool.Put(&s)
}
// GetStringSlice gets a string slice from the pool
func GetStringSlice() []string {
sp := stringSlicePool.Get().(*[]string)
return *sp
}
// PutStringSlice returns a string slice to the pool
func PutStringSlice(s []string) {
s = s[:0] // Reset slice
stringSlicePool.Put(&s)
}
// GetValueSlice gets a time series value slice from the pool
func GetValueSlice() []*qbtypes.TimeSeriesValue {
sp := valueSlicePool.Get().(*[]*qbtypes.TimeSeriesValue)
return *sp
}
// PutValueSlice returns a time series value slice to the pool
func PutValueSlice(s []*qbtypes.TimeSeriesValue) {
s = s[:0] // Reset slice
valueSlicePool.Put(&s)
}
// GetAggValueMap gets an aggregation value map from the pool
func GetAggValueMap() map[int]float64 {
mp := aggValueMapPool.Get().(*map[int]float64)
m := *mp
// Clear the map
for k := range m {
delete(m, k)
}
return m
}
// PutAggValueMap returns an aggregation value map to the pool
func PutAggValueMap(m map[int]float64) {
// Clear before returning to pool
for k := range m {
delete(m, k)
}
aggValueMapPool.Put(&m)
}

View File

@@ -2,6 +2,7 @@ package querier
import (
"context"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
@@ -31,8 +32,13 @@ func newPromqlQuery(
}
func (q *promqlQuery) Fingerprint() string {
// TODO: Implement this
return ""
parts := []string{
"promql",
q.query.Query,
q.query.Step.Duration.String(),
}
return strings.Join(parts, "&")
}
func (q *promqlQuery) Window() (uint64, uint64) {
@@ -113,13 +119,11 @@ func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) {
return &qbv5.Result{
Type: q.requestType,
Value: []*qbv5.TimeSeriesData{
{
QueryName: q.query.Name,
Aggregations: []*qbv5.AggregationBucket{
{
Series: series,
},
Value: &qbv5.TimeSeriesData{
QueryName: q.query.Name,
Aggregations: []*qbv5.AggregationBucket{
{
Series: series,
},
},
},

View File

@@ -2,45 +2,62 @@ package querier
import (
"context"
"fmt"
"log/slog"
"slices"
"sync"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"golang.org/x/exp/maps"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
type querier struct {
logger *slog.Logger
telemetryStore telemetrystore.TelemetryStore
metadataStore telemetrytypes.MetadataStore
promEngine prometheus.Prometheus
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
bucketCache BucketCache
}
func NewQuerier(
var _ Querier = (*querier)(nil)
func New(
settings factory.ProviderSettings,
telemetryStore telemetrystore.TelemetryStore,
metadataStore telemetrytypes.MetadataStore,
promEngine prometheus.Prometheus,
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
bucketCache BucketCache,
) *querier {
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
return &querier{
logger: querierSettings.Logger(),
telemetryStore: telemetryStore,
metadataStore: metadataStore,
promEngine: promEngine,
traceStmtBuilder: traceStmtBuilder,
logStmtBuilder: logStmtBuilder,
metricStmtBuilder: metricStmtBuilder,
bucketCache: bucketCache,
}
}
func (q *querier) QueryRange(ctx context.Context, orgID string, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
queries := make(map[string]qbtypes.Query)
steps := make(map[string]qbtypes.Step)
for _, query := range req.CompositeQuery.Queries {
switch query.Type {
@@ -50,47 +67,321 @@ func (q *querier) QueryRange(ctx context.Context, orgID string, req *qbtypes.Que
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", query.Spec)
}
promqlQuery := newPromqlQuery(q.promEngine, promQuery, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
queries[query.Name] = promqlQuery
queries[promQuery.Name] = promqlQuery
steps[promQuery.Name] = promQuery.Step
case qbtypes.QueryTypeClickHouseSQL:
chQuery, ok := query.Spec.(qbtypes.ClickHouseQuery)
if !ok {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid clickhouse query spec %T", query.Spec)
}
chSQLQuery := newchSQLQuery(q.telemetryStore, chQuery, nil, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
queries[query.Name] = chSQLQuery
queries[chQuery.Name] = chSQLQuery
case qbtypes.QueryTypeBuilder:
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
queries[query.Name] = bq
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
queries[query.Name] = bq
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
queries[query.Name] = bq
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported builder spec type %T", query.Spec)
}
}
}
return q.run(ctx, orgID, queries, req.RequestType)
return q.run(ctx, orgID, queries, req, steps)
}
func (q *querier) run(ctx context.Context, _ string, qs map[string]qbtypes.Query, kind qbtypes.RequestType) (*qbtypes.QueryRangeResponse, error) {
results := make([]*qbtypes.Result, 0, len(qs))
for _, query := range qs {
// TODO: run in controlled batches
result, err := query.Execute(ctx)
if err != nil {
return nil, err
func (q *querier) run(ctx context.Context, orgID valuer.UUID, qs map[string]qbtypes.Query, req *qbtypes.QueryRangeRequest, steps map[string]qbtypes.Step) (*qbtypes.QueryRangeResponse, error) {
results := make(map[string]any)
warnings := make([]string, 0)
stats := qbtypes.ExecStats{}
for name, query := range qs {
// Skip cache if NoCache is set, or if cache is not available
if req.NoCache || q.bucketCache == nil || query.Fingerprint() == "" {
if req.NoCache {
q.logger.DebugContext(ctx, "NoCache flag set, bypassing cache", "query", name)
} else {
q.logger.InfoContext(ctx, "no bucket cache or fingerprint, executing query", "fingerprint", query.Fingerprint())
}
result, err := query.Execute(ctx)
if err != nil {
return nil, err
}
results[name] = result.Value
warnings = append(warnings, result.Warnings...)
stats.RowsScanned += result.Stats.RowsScanned
stats.BytesScanned += result.Stats.BytesScanned
stats.DurationMS += result.Stats.DurationMS
} else {
result, err := q.executeWithCache(ctx, orgID, query, steps[name], req.NoCache)
if err != nil {
return nil, err
}
results[name] = result.Value
warnings = append(warnings, result.Warnings...)
stats.RowsScanned += result.Stats.RowsScanned
stats.BytesScanned += result.Stats.BytesScanned
stats.DurationMS += result.Stats.DurationMS
}
results = append(results, result)
}
return &qbtypes.QueryRangeResponse{
Type: kind,
Data: results,
Type: req.RequestType,
Data: struct {
Results []any `json:"results"`
Warnings []string `json:"warnings"`
}{
Results: maps.Values(results),
Warnings: warnings,
},
Meta: struct {
RowsScanned uint64 `json:"rowsScanned"`
BytesScanned uint64 `json:"bytesScanned"`
DurationMS uint64 `json:"durationMs"`
}{
RowsScanned: stats.RowsScanned,
BytesScanned: stats.BytesScanned,
DurationMS: stats.DurationMS,
},
}, nil
}
// executeWithCache executes a query using the bucket cache
func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query qbtypes.Query, step qbtypes.Step, noCache bool) (*qbtypes.Result, error) {
// Get cached data and missing ranges
cachedResult, missingRanges := q.bucketCache.GetMissRanges(ctx, orgID, query, step)
// If no missing ranges, return cached result
if len(missingRanges) == 0 && cachedResult != nil {
return cachedResult, nil
}
// If entire range is missing, execute normally
if cachedResult == nil && len(missingRanges) == 1 {
startMs, endMs := query.Window()
if missingRanges[0].From == startMs && missingRanges[0].To == endMs {
result, err := query.Execute(ctx)
if err != nil {
return nil, err
}
// Store in cache for future use
q.bucketCache.Put(ctx, orgID, query, result)
return result, nil
}
}
// Execute queries for missing ranges with bounded parallelism
freshResults := make([]*qbtypes.Result, len(missingRanges))
errors := make([]error, len(missingRanges))
totalStats := qbtypes.ExecStats{}
sem := make(chan struct{}, 4)
var wg sync.WaitGroup
for i, timeRange := range missingRanges {
wg.Add(1)
go func(idx int, tr *qbtypes.TimeRange) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
// Create a new query with the missing time range
rangedQuery := q.createRangedQuery(query, *tr)
if rangedQuery == nil {
errors[idx] = fmt.Errorf("failed to create ranged query for range %d-%d", tr.From, tr.To)
return
}
// Execute the ranged query
result, err := rangedQuery.Execute(ctx)
if err != nil {
errors[idx] = err
return
}
freshResults[idx] = result
}(i, timeRange)
}
// Wait for all queries to complete
wg.Wait()
// Check for errors
for _, err := range errors {
if err != nil {
// If any query failed, fall back to full execution
q.logger.ErrorContext(ctx, "parallel query execution failed", "error", err)
result, err := query.Execute(ctx)
if err != nil {
return nil, err
}
q.bucketCache.Put(ctx, orgID, query, result)
return result, nil
}
}
// Calculate total stats and filter out nil results
validResults := make([]*qbtypes.Result, 0, len(freshResults))
for _, result := range freshResults {
if result != nil {
validResults = append(validResults, result)
totalStats.RowsScanned += result.Stats.RowsScanned
totalStats.BytesScanned += result.Stats.BytesScanned
totalStats.DurationMS += result.Stats.DurationMS
}
}
freshResults = validResults
// Merge cached and fresh results
mergedResult := q.mergeResults(cachedResult, freshResults)
mergedResult.Stats.RowsScanned += totalStats.RowsScanned
mergedResult.Stats.BytesScanned += totalStats.BytesScanned
mergedResult.Stats.DurationMS += totalStats.DurationMS
// Store merged result in cache
q.bucketCache.Put(ctx, orgID, query, mergedResult)
return mergedResult, nil
}
// createRangedQuery creates a copy of the query with a different time range
func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtypes.TimeRange) qbtypes.Query {
switch qt := originalQuery.(type) {
case *promqlQuery:
return newPromqlQuery(q.promEngine, qt.query, timeRange, qt.requestType)
case *chSQLQuery:
return newchSQLQuery(q.telemetryStore, qt.query, qt.args, timeRange, qt.kind)
case *builderQuery[qbtypes.TraceAggregation]:
return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, qt.spec, timeRange, qt.kind)
case *builderQuery[qbtypes.LogAggregation]:
return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, qt.spec, timeRange, qt.kind)
case *builderQuery[qbtypes.MetricAggregation]:
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, qt.spec, timeRange, qt.kind)
default:
return nil
}
}
// mergeResults merges cached result with fresh results
func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result) *qbtypes.Result {
if cached == nil && len(fresh) == 1 {
return fresh[0]
}
// Start with cached result
merged := &qbtypes.Result{
Type: cached.Type,
Value: cached.Value,
Stats: cached.Stats,
Warnings: cached.Warnings,
}
// If no fresh results, return cached
if len(fresh) == 0 {
return merged
}
switch merged.Type {
case qbtypes.RequestTypeTimeSeries:
merged.Value = q.mergeTimeSeriesResults(cached.Value.(*qbtypes.TimeSeriesData), fresh)
}
if len(fresh) > 0 {
totalWarnings := len(merged.Warnings)
for _, result := range fresh {
totalWarnings += len(result.Warnings)
}
allWarnings := make([]string, 0, totalWarnings)
allWarnings = append(allWarnings, merged.Warnings...)
for _, result := range fresh {
allWarnings = append(allWarnings, result.Warnings...)
}
merged.Warnings = allWarnings
}
return merged
}
// mergeTimeSeriesResults merges time series data
func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, freshResults []*qbtypes.Result) *qbtypes.TimeSeriesData {
// Map to store merged series by query name and series key
seriesMap := make(map[int]map[string]*qbtypes.TimeSeries)
for _, aggBucket := range cachedValue.Aggregations {
if seriesMap[aggBucket.Index] == nil {
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
}
for _, series := range aggBucket.Series {
key := qbtypes.GetUniqueSeriesKey(series.Labels)
seriesMap[aggBucket.Index][key] = series
}
}
// Add fresh series
for _, result := range freshResults {
freshTS, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
continue
}
for _, aggBucket := range freshTS.Aggregations {
if seriesMap[aggBucket.Index] == nil {
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
}
}
for _, aggBucket := range freshTS.Aggregations {
for _, series := range aggBucket.Series {
key := qbtypes.GetUniqueSeriesKey(series.Labels)
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
// Merge values
existingSeries.Values = append(existingSeries.Values, series.Values...)
} else {
// New series
seriesMap[aggBucket.Index][key] = series
}
}
}
}
result := &qbtypes.TimeSeriesData{
QueryName: cachedValue.QueryName,
Aggregations: []*qbtypes.AggregationBucket{},
}
for index, series := range seriesMap {
var aggSeries []*qbtypes.TimeSeries
for _, s := range series {
// Sort values by timestamp
slices.SortFunc(s.Values, func(a, b *qbtypes.TimeSeriesValue) int {
if a.Timestamp < b.Timestamp {
return -1
}
if a.Timestamp > b.Timestamp {
return 1
}
return 0
})
aggSeries = append(aggSeries, s)
}
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
Index: index,
Series: aggSeries,
})
}
return result
}

View File

@@ -0,0 +1,140 @@
package signozquerier
import (
"context"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
)
// NewFactory creates a new factory for the signoz querier provider
func NewFactory(
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
cache cache.Cache,
) factory.ProviderFactory[querier.Querier, querier.Config] {
return factory.NewProviderFactory(
factory.MustNewName("signoz"),
func(
ctx context.Context,
settings factory.ProviderSettings,
cfg querier.Config,
) (querier.Querier, error) {
return newProvider(ctx, settings, cfg, telemetryStore, prometheus, cache)
},
)
}
func newProvider(
_ context.Context,
settings factory.ProviderSettings,
cfg querier.Config,
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
cache cache.Cache,
) (querier.Querier, error) {
// Create telemetry metadata store
telemetryMetadataStore := telemetrymetadata.NewTelemetryMetaStore(
settings,
telemetryStore,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,
telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName,
telemetrylogs.DBName,
telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName,
telemetrymetadata.DBName,
telemetrymetadata.AttributesMetadataLocalTableName,
)
// Create trace statement builder
traceFieldMapper := telemetrytraces.NewFieldMapper()
traceConditionBuilder := telemetrytraces.NewConditionBuilder(traceFieldMapper)
resourceFilterFieldMapper := resourcefilter.NewFieldMapper()
resourceFilterConditionBuilder := resourcefilter.NewConditionBuilder(resourceFilterFieldMapper)
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
telemetryMetadataStore,
)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(nil, traceFieldMapper, traceConditionBuilder, "", nil)
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
settings,
telemetryMetadataStore,
traceFieldMapper,
traceConditionBuilder,
resourceFilterStmtBuilder,
traceAggExprRewriter,
)
// Create log statement builder
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
telemetryMetadataStore,
)
logAggExprRewriter := querybuilder.NewAggExprRewriter(
telemetrylogs.DefaultFullTextColumn,
logFieldMapper,
logConditionBuilder,
telemetrylogs.BodyJSONStringSearchPrefix,
telemetrylogs.GetBodyJSONKey,
)
logStmtBuilder := telemetrylogs.NewLogQueryStatementBuilder(
settings,
telemetryMetadataStore,
logFieldMapper,
logConditionBuilder,
logResourceFilterStmtBuilder,
logAggExprRewriter,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.BodyJSONStringSearchPrefix,
telemetrylogs.GetBodyJSONKey,
)
// Create metric statement builder
metricFieldMapper := telemetrymetrics.NewFieldMapper()
metricConditionBuilder := telemetrymetrics.NewConditionBuilder(metricFieldMapper)
metricStmtBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(
settings,
telemetryMetadataStore,
metricFieldMapper,
metricConditionBuilder,
)
// Create bucket cache
bucketCache := querier.NewBucketCache(
settings,
cache,
cfg.CacheTTL,
cfg.FluxInterval,
)
// Create and return the querier
return querier.New(
settings,
telemetryStore,
telemetryMetadataStore,
prometheus,
traceStmtBuilder,
logStmtBuilder,
metricStmtBuilder,
bucketCache,
), nil
}

View File

@@ -75,6 +75,8 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/version"
querierAPI "github.com/SigNoz/signoz/pkg/querier"
)
type status string
@@ -144,6 +146,8 @@ type APIHandler struct {
FieldsAPI *fields.API
QuerierAPI *querierAPI.API
Signoz *signoz.SigNoz
}
@@ -180,6 +184,8 @@ type APIHandlerOpts struct {
FieldsAPI *fields.API
QuerierAPI *querierAPI.API
Signoz *signoz.SigNoz
}
@@ -243,6 +249,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
LicensingAPI: opts.LicensingAPI,
Signoz: opts.Signoz,
FieldsAPI: opts.FieldsAPI,
QuerierAPI: opts.QuerierAPI,
}
logsQueryBuilder := logsv4.PrepareLogsQuery
@@ -473,6 +480,11 @@ func (aH *APIHandler) RegisterQueryRangeV4Routes(router *mux.Router, am *middlew
subRouter.HandleFunc("/metric/metric_metadata", am.ViewAccess(aH.getMetricMetadata)).Methods(http.MethodGet)
}
func (aH *APIHandler) RegisterQueryRangeV5Routes(router *mux.Router, am *middleware.AuthZ) {
subRouter := router.PathPrefix("/api/v5").Subrouter()
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QuerierAPI.QueryRange)).Methods(http.MethodPost)
}
// todo(remove): Implemented at render package (github.com/SigNoz/signoz/pkg/http/render) with the new error structure
func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
writeHttpResponse(w, data)

View File

@@ -17,6 +17,7 @@ import (
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/prometheus"
querierAPI "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
@@ -149,8 +150,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
JWT: serverOptions.Jwt,
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
LicensingAPI: nooplicensing.NewLicenseAPI(),
FieldsAPI: fields.NewAPI(serverOptions.SigNoz.TelemetryStore, serverOptions.SigNoz.Instrumentation.Logger()),
FieldsAPI: fields.NewAPI(serverOptions.SigNoz.Instrumentation.ToProviderSettings(), serverOptions.SigNoz.TelemetryStore),
Signoz: serverOptions.SigNoz,
QuerierAPI: querierAPI.NewAPI(serverOptions.SigNoz.Querier),
})
if err != nil {
return nil, err
@@ -266,6 +268,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
api.RegisterInfraMetricsRoutes(r, am)
api.RegisterWebSocketPaths(r, am)
api.RegisterQueryRangeV4Routes(r, am)
api.RegisterQueryRangeV5Routes(r, am)
api.RegisterMessagingQueuesRoutes(r, am)
api.RegisterThirdPartyApiRoutes(r, am)
api.MetricExplorerRoutes(r, am)

View File

@@ -135,9 +135,10 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
// warnings would be encountered as part of the main condition already
filterWhereClause, _, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fieldMapper,
ConditionBuilder: b.conditionBuilder,
FieldKeys: keys,
FieldMapper: b.fieldMapper,
ConditionBuilder: b.conditionBuilder,
FieldKeys: keys,
SkipFullTextFilter: true,
})
if err != nil {

View File

@@ -28,6 +28,7 @@ type filterExpressionVisitor struct {
jsonBodyPrefix string
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
skipResourceFilter bool
skipFullTextFilter bool
}
type FilterExprVisitorOpts struct {
@@ -39,6 +40,7 @@ type FilterExprVisitorOpts struct {
JsonBodyPrefix string
JsonKeyToKey qbtypes.JsonKeyToFieldFunc
SkipResourceFilter bool
SkipFullTextFilter bool
}
// newFilterExpressionVisitor creates a new filterExpressionVisitor
@@ -52,6 +54,7 @@ func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVis
jsonBodyPrefix: opts.JsonBodyPrefix,
jsonKeyToKey: opts.JsonKeyToKey,
skipResourceFilter: opts.SkipResourceFilter,
skipFullTextFilter: opts.SkipFullTextFilter,
}
}
@@ -230,6 +233,10 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
// Handle standalone key/value as a full text search term
if ctx.GetChildCount() == 1 {
if v.skipFullTextFilter {
return ""
}
if v.fullTextColumn == nil {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
@@ -451,6 +458,10 @@ func (v *filterExpressionVisitor) VisitValueList(ctx *grammar.ValueListContext)
// VisitFullText handles standalone quoted strings for full-text search
func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) any {
if v.skipFullTextFilter {
return ""
}
var text string
if ctx.QUOTED_TEXT() != nil {

View File

@@ -18,6 +18,7 @@ import (
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sqlmigration"
@@ -67,6 +68,9 @@ type Config struct {
// Alertmanager config
Alertmanager alertmanager.Config `mapstructure:"alertmanager" yaml:"alertmanager"`
// Querier config
Querier querier.Config `mapstructure:"querier"`
// Ruler config
Ruler ruler.Config `mapstructure:"ruler"`
@@ -102,6 +106,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
telemetrystore.NewConfigFactory(),
prometheus.NewConfigFactory(),
alertmanager.NewConfigFactory(),
querier.NewConfigFactory(),
ruler.NewConfigFactory(),
emailing.NewConfigFactory(),
sharder.NewConfigFactory(),

View File

@@ -17,6 +17,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querier/signozquerier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/ruler/signozruler"
"github.com/SigNoz/signoz/pkg/sharder"
@@ -156,3 +158,9 @@ func NewStatsReporterProviderFactories(telemetryStore telemetrystore.TelemetrySt
noopstatsreporter.NewFactory(),
)
}
func NewQuerierProviderFactories(telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, cache cache.Cache) factory.NamedMap[factory.ProviderFactory[querier.Querier, querier.Config]] {
return factory.MustNewNamedMap(
signozquerier.NewFactory(telemetryStore, prometheus, cache),
)
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sqlmigration"
@@ -35,6 +36,7 @@ type SigNoz struct {
TelemetryStore telemetrystore.TelemetryStore
Prometheus prometheus.Prometheus
Alertmanager alertmanager.Alertmanager
Querier querier.Querier
Rules ruler.Ruler
Zeus zeus.Zeus
Licensing licensing.Licensing
@@ -166,6 +168,18 @@ func New(
return nil, err
}
// Initialize querier from the available querier provider factories
querier, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.Querier,
NewQuerierProviderFactories(telemetrystore, prometheus, cache),
config.Querier.Provider(),
)
if err != nil {
return nil, err
}
// Run migrations on the sqlstore
sqlmigrations, err := sqlmigration.New(
ctx,
@@ -280,6 +294,7 @@ func New(
TelemetryStore: telemetrystore,
Prometheus: prometheus,
Alertmanager: alertmanager,
Querier: querier,
Zeus: zeus,
Licensing: licensing,
Emailing: emailing,

View File

@@ -1,6 +1,14 @@
package telemetrylogs
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
var (
DefaultFullTextColumn = &telemetrytypes.TelemetryFieldKey{
Name: "body",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
}
BodyJSONStringSearchPrefix = `body.`
IntrinsicFields = []string{"timestamp", "body", "trace_id", "span_id", "trace_flags", "severity_text", "severity_number"}
)

View File

@@ -7,6 +7,7 @@ import (
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -24,25 +25,37 @@ type logQueryStatementBuilder struct {
cb qbtypes.ConditionBuilder
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
aggExprRewriter qbtypes.AggExprRewriter
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonBodyPrefix string
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
}
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*logQueryStatementBuilder)(nil)
func NewLogQueryStatementBuilder(
logger *slog.Logger,
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *logQueryStatementBuilder {
logsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrylogs")
return &logQueryStatementBuilder{
logger: logger,
logger: logsSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
aggExprRewriter: aggExprRewriter,
fullTextColumn: fullTextColumn,
jsonBodyPrefix: jsonBodyPrefix,
jsonKeyToKey: jsonKeyToKey,
}
}
@@ -102,7 +115,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []
for idx := range query.Order {
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: query.Order[idx].Key.Name,
Signal: telemetrytypes.SignalTraces,
Signal: telemetrytypes.SignalLogs,
FieldContext: query.Order[idx].Key.FieldContext,
FieldDataType: query.Order[idx].Key.FieldDataType,
})
@@ -413,6 +426,9 @@ func (b *logQueryStatementBuilder) addFilterCondition(
ConditionBuilder: b.cb,
FieldKeys: keys,
SkipResourceFilter: true,
FullTextColumn: b.fullTextColumn,
JsonBodyPrefix: b.jsonBodyPrefix,
JsonKeyToKey: b.jsonKeyToKey,
})
if err != nil {
@@ -427,7 +443,7 @@ func (b *logQueryStatementBuilder) addFilterCondition(
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
endBucket := end / querybuilder.NsToSeconds
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.LE("timestamp", fmt.Sprintf("%d", end)), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket))
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.L("timestamp", fmt.Sprintf("%d", end)), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket))
return warnings, nil
}

View File

@@ -2,10 +2,10 @@ package telemetrylogs
import (
"context"
"log/slog"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -65,7 +65,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL",
Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
@@ -83,12 +83,15 @@ func TestStatementBuilder(t *testing.T) {
require.NoError(t, err)
statementBuilder := NewLogQueryStatementBuilder(
slog.Default(),
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
)
for _, c := range cases {

View File

@@ -6,6 +6,7 @@ import (
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -40,7 +41,7 @@ type telemetryMetaStore struct {
}
func NewTelemetryMetaStore(
logger *slog.Logger,
settings factory.ProviderSettings,
telemetrystore telemetrystore.TelemetryStore,
tracesDBName string,
tracesFieldsTblName string,

View File

@@ -3,11 +3,10 @@ package telemetrymetadata
import (
"context"
"fmt"
"io"
"log/slog"
"regexp"
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
@@ -36,7 +35,7 @@ func TestGetKeys(t *testing.T) {
mock := mockTelemetryStore.Mock()
metadata := NewTelemetryMetaStore(
slog.New(slog.NewTextHandler(io.Discard, nil)),
instrumentationtest.New().ToProviderSettings(),
mockTelemetryStore,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -18,28 +19,26 @@ const (
)
type metricQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
aggExprRewriter qbtypes.AggExprRewriter
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
}
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*metricQueryStatementBuilder)(nil)
func NewMetricQueryStatementBuilder(
logger *slog.Logger,
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
aggExprRewriter qbtypes.AggExprRewriter,
) *metricQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetrics")
return &metricQueryStatementBuilder{
logger: logger,
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
aggExprRewriter: aggExprRewriter,
logger: metricsSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
}
}

View File

@@ -2,11 +2,10 @@ package telemetrymetrics
import (
"context"
"log/slog"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -193,14 +192,11 @@ func TestStatementBuilder(t *testing.T) {
}
mockMetadataStore.KeysMap = keys
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
statementBuilder := NewMetricQueryStatementBuilder(
slog.Default(),
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
aggExprRewriter,
)
for _, c := range cases {

View File

@@ -7,6 +7,7 @@ import (
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -29,15 +30,16 @@ type traceQueryStatementBuilder struct {
var _ qbtypes.StatementBuilder[qbtypes.TraceAggregation] = (*traceQueryStatementBuilder)(nil)
func NewTraceQueryStatementBuilder(
logger *slog.Logger,
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
) *traceQueryStatementBuilder {
tracesSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrytraces")
return &traceQueryStatementBuilder{
logger: logger,
logger: tracesSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
@@ -455,7 +457,7 @@ func (b *traceQueryStatementBuilder) addFilterCondition(
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
endBucket := end / querybuilder.NsToSeconds
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.LE("timestamp", fmt.Sprintf("%d", end)), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket))
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.L("timestamp", fmt.Sprintf("%d", end)), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket))
return warnings, nil
}

View File

@@ -2,10 +2,10 @@ package telemetrytraces
import (
"context"
"log/slog"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -59,7 +59,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL",
Args: []any{"redis-manual", "%service.name%", "%service.name%redis-manual%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
@@ -76,7 +76,7 @@ func TestStatementBuilder(t *testing.T) {
require.NoError(t, err)
statementBuilder := NewTraceQueryStatementBuilder(
slog.Default(),
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,

View File

@@ -1,9 +0,0 @@
package querybuildertypesv5
// BucketCache is the only thing orchestrator cares about.
type BucketCache interface {
// cached portion + list of gaps to fetch
GetMissRanges(q Query) (cached Result, missing []TimeRange)
// store fresh buckets for future hits
Put(q Query, fresh Result)
}

View File

@@ -1,7 +0,0 @@
package querybuildertypesv5
import "context"
type Querier interface {
QueryRange(ctx context.Context, req QueryRangeRequest) (QueryRangeResponse, error)
}

View File

@@ -8,8 +8,6 @@ import (
)
type QueryEnvelope struct {
// Name is the unique identifier for the query.
Name string `json:"name"`
// Type is the type of the query.
Type QueryType `json:"type"` // "builder_query" | "builder_formula" | "builder_sub_query" | "builder_join" | "promql" | "clickhouse_sql"
// Spec is the deferred decoding of the query if any.
@@ -27,7 +25,6 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid query envelope")
}
q.Name = shadow.Name
q.Type = shadow.Type
// 2. Decode the spec based on the Type.
@@ -116,4 +113,14 @@ type QueryRangeRequest struct {
CompositeQuery CompositeQuery `json:"compositeQuery"`
// Variables is the variables to use for the request.
Variables map[string]any `json:"variables,omitempty"`
// NoCache is a flag to disable caching for the request.
NoCache bool `json:"noCache,omitempty"`
FormatOptions *FormatOptions `json:"formatOptions,omitempty"`
}
type FormatOptions struct {
FillGaps bool `json:"fillGaps,omitempty"`
FormatTableResultForUI bool `json:"formatTableResultForUI,omitempty"`
}

View File

@@ -27,9 +27,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "A",
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"aggregations": [{
"expression": "count()",
@@ -65,9 +65,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "A",
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Aggregations: []TraceAggregation{{
Expression: "count()",
@@ -111,9 +111,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "raw",
"compositeQuery": {
"queries": [{
"name": "B",
"type": "builder_query",
"spec": {
"name": "B",
"signal": "logs",
"stepInterval": "30s",
"filter": {
@@ -136,9 +136,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeRaw,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "B",
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[LogAggregation]{
Name: "B",
Signal: telemetrytypes.SignalLogs,
StepInterval: Step{Duration: 30 * time.Second},
Filter: &Filter{
@@ -165,9 +165,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "C",
"type": "builder_query",
"spec": {
"name": "C",
"signal": "metrics",
"aggregations": [{
"metricName": "http_requests_total",
@@ -191,9 +191,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "C",
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[MetricAggregation]{
Name: "C",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []MetricAggregation{{
MetricName: "http_requests_total",
@@ -223,7 +223,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "F1",
"type": "builder_formula",
"spec": {
"name": "error_rate",
@@ -243,7 +242,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "F1",
Type: QueryTypeFormula,
Spec: QueryBuilderFormula{
Name: "error_rate",
@@ -267,7 +265,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "F1",
"type": "builder_formula",
"spec": {
"name": "error_rate",
@@ -289,7 +286,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "F1",
Type: QueryTypeFormula,
Spec: QueryBuilderFormula{
Name: "error_rate",
@@ -314,7 +310,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "scalar",
"compositeQuery": {
"queries": [{
"name": "J1",
"type": "builder_join",
"spec": {
"name": "join_traces_logs",
@@ -335,7 +330,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeScalar,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "J1",
Type: QueryTypeJoin,
Spec: QueryBuilderJoin{
Name: "join_traces_logs",
@@ -360,7 +354,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "P1",
"type": "promql",
"spec": {
"name": "cpu_usage",
@@ -377,7 +370,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "P1",
Type: QueryTypePromQL,
Spec: PromQuery{
Name: "cpu_usage",
@@ -398,7 +390,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "raw",
"compositeQuery": {
"queries": [{
"name": "CH1",
"type": "clickhouse_sql",
"spec": {
"name": "custom_query",
@@ -415,7 +406,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeRaw,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "CH1",
Type: QueryTypeClickHouseSQL,
Spec: ClickHouseQuery{
Name: "custom_query",
@@ -437,9 +427,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"compositeQuery": {
"queries": [
{
"name": "A",
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"aggregations": [{"expression": "count()"}],
"disabled": false
@@ -464,16 +454,15 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Name: "A",
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Aggregations: []TraceAggregation{{Expression: "count()"}},
Disabled: false,
},
},
{
Name: "B",
Type: QueryTypeFormula,
Spec: QueryBuilderFormula{
Name: "rate",
@@ -494,9 +483,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "A",
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test"}],
"stepInterval": "5m"
@@ -511,9 +500,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "A",
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []MetricAggregation{{MetricName: "test"}},
StepInterval: Step{Duration: 5 * time.Minute},
@@ -607,7 +596,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
for i, expectedQuery := range tt.expected.CompositeQuery.Queries {
actualQuery := req.CompositeQuery.Queries[i]
assert.Equal(t, expectedQuery.Name, actualQuery.Name)
assert.Equal(t, expectedQuery.Type, actualQuery.Type)
switch expectedQuery.Type {
@@ -616,6 +604,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
case QueryBuilderQuery[TraceAggregation]:
actualSpec, ok := actualQuery.Spec.(QueryBuilderQuery[TraceAggregation])
require.True(t, ok, "Expected TraceBuilderQuery but got %T", actualQuery.Spec)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
assert.Equal(t, expectedSpec.Signal, actualSpec.Signal)
assert.Equal(t, expectedSpec.StepInterval, actualSpec.StepInterval)
assert.Equal(t, expectedSpec.Disabled, actualSpec.Disabled)
@@ -623,6 +612,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
case QueryBuilderQuery[LogAggregation]:
actualSpec, ok := actualQuery.Spec.(QueryBuilderQuery[LogAggregation])
require.True(t, ok, "Expected LogBuilderQuery but got %T", actualQuery.Spec)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
assert.Equal(t, expectedSpec.Signal, actualSpec.Signal)
assert.Equal(t, expectedSpec.StepInterval, actualSpec.StepInterval)
assert.Equal(t, expectedSpec.Disabled, actualSpec.Disabled)
@@ -630,6 +620,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
case QueryBuilderQuery[MetricAggregation]:
actualSpec, ok := actualQuery.Spec.(QueryBuilderQuery[MetricAggregation])
require.True(t, ok, "Expected MetricBuilderQuery but got %T", actualQuery.Spec)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
assert.Equal(t, expectedSpec.Signal, actualSpec.Signal)
assert.Equal(t, expectedSpec.StepInterval, actualSpec.StepInterval)
assert.Equal(t, expectedSpec.Disabled, actualSpec.Disabled)
@@ -647,6 +638,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
expectedSpec := expectedQuery.Spec.(QueryBuilderFormula)
actualSpec, ok := actualQuery.Spec.(QueryBuilderFormula)
require.True(t, ok, "Expected QueryBuilderFormula but got %T", actualQuery.Spec)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
assert.Equal(t, expectedSpec.Expression, actualSpec.Expression)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
case QueryTypeJoin:
@@ -654,21 +646,23 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
actualSpec, ok := actualQuery.Spec.(QueryBuilderJoin)
require.True(t, ok, "Expected QueryBuilderJoin but got %T", actualQuery.Spec)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
assert.Equal(t, expectedSpec.Left.Name, actualSpec.Left.Name)
assert.Equal(t, expectedSpec.Right.Name, actualSpec.Right.Name)
assert.Equal(t, expectedSpec.Type, actualSpec.Type)
assert.Equal(t, expectedSpec.On, actualSpec.On)
case QueryTypePromQL:
expectedSpec := expectedQuery.Spec.(PromQuery)
actualSpec, ok := actualQuery.Spec.(PromQuery)
require.True(t, ok, "Expected PromQuery but got %T", actualQuery.Spec)
assert.Equal(t, expectedSpec.Query, actualSpec.Query)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
assert.Equal(t, expectedSpec.Query, actualSpec.Query)
assert.Equal(t, expectedSpec.Disabled, actualSpec.Disabled)
case QueryTypeClickHouseSQL:
expectedSpec := expectedQuery.Spec.(ClickHouseQuery)
actualSpec, ok := actualQuery.Spec.(ClickHouseQuery)
require.True(t, ok, "Expected ClickHouseQuery but got %T", actualQuery.Spec)
assert.Equal(t, expectedSpec.Query, actualSpec.Query)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
assert.Equal(t, expectedSpec.Query, actualSpec.Query)
assert.Equal(t, expectedSpec.Disabled, actualSpec.Disabled)
}
}

View File

@@ -1,6 +1,9 @@
package querybuildertypesv5
import (
"fmt"
"slices"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -19,8 +22,11 @@ type TimeSeriesData struct {
}
type AggregationBucket struct {
Index int `json:"index"` // or string Alias
Alias string `json:"alias"`
Index int `json:"index"` // or string Alias
Alias string `json:"alias"`
Meta struct {
Unit string `json:"unit,omitempty"`
} `json:"meta,omitempty"`
Series []*TimeSeries `json:"series"` // no extra nesting
}
@@ -34,9 +40,64 @@ type Label struct {
Value any `json:"value"`
}
func GetUniqueSeriesKey(labels []*Label) string {
// Fast path for common cases
if len(labels) == 0 {
return ""
}
if len(labels) == 1 {
return fmt.Sprintf("%s=%v,", labels[0].Key.Name, labels[0].Value)
}
// Use a map to collect labels for consistent ordering without copying
labelMap := make(map[string]string, len(labels))
keys := make([]string, 0, len(labels))
// Estimate total size for string builder
estimatedSize := 0
for _, label := range labels {
if _, exists := labelMap[label.Key.Name]; !exists {
keys = append(keys, label.Key.Name)
estimatedSize += len(label.Key.Name) + 2 // key + '=' + ','
}
// get the value as string
value, ok := label.Value.(string)
if !ok {
value = fmt.Sprintf("%v", label.Value)
}
estimatedSize += len(value)
labelMap[label.Key.Name] = value
}
// Sort just the keys
slices.Sort(keys)
// Build the key using sorted keys with better size estimation
var key strings.Builder
key.Grow(estimatedSize)
for _, k := range keys {
key.WriteString(k)
key.WriteByte('=')
key.WriteString(labelMap[k])
key.WriteByte(',')
}
return key.String()
}
type TimeSeriesValue struct {
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
// true if the value is "partial", i.e doesn't cover the complete interval.
// for instance, if the query start time is 3:14:15 PM, and the step is 1 minute,
// the ts is rounded to 3:14 but the value only covers 3:14:15 PM to 3:15:00 PM
// this partial result cannot be cached and should be ignored.
// on the client side, these partial values are rendered differently.
Partial bool `json:"partial,omitempty"`
// for the heatmap type chart
Values []float64 `json:"values,omitempty"`
Bucket *Bucket `json:"bucket,omitempty"`
@@ -59,9 +120,12 @@ var (
type ColumnDescriptor struct {
telemetrytypes.TelemetryFieldKey
QueryName string `json:"queryName"`
AggregationIndex int64 `json:"aggregationIndex"`
Type ColumnType `json:"columnType"`
QueryName string `json:"queryName"`
AggregationIndex int64 `json:"aggregationIndex"`
Meta struct {
Unit string `json:"unit,omitempty"`
} `json:"meta,omitempty"`
Type ColumnType `json:"columnType"`
}
type ScalarData struct {