mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-12 00:12:02 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ac5686651 | ||
|
|
1bdb8e6308 | ||
|
|
cf88b42702 | ||
|
|
45e69c07a1 | ||
|
|
4e6fa6286b | ||
|
|
b453010075 |
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
gomaps "maps"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -276,23 +277,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch temporality for all metrics at once
|
||||
var metricTemporality map[string]metrictypes.Temporality
|
||||
var metricTypes map[string]metrictypes.Type
|
||||
if len(metricNames) > 0 {
|
||||
var err error
|
||||
metricTemporality, metricTypes, err = q.metadataStore.FetchTemporalityAndTypeMulti(ctx, req.Start, req.End, metricNames...)
|
||||
if err != nil {
|
||||
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
|
||||
// Continue without temporality - statement builder will handle unspecified
|
||||
metricTemporality = make(map[string]metrictypes.Temporality)
|
||||
metricTypes = make(map[string]metrictypes.Type)
|
||||
}
|
||||
q.logger.DebugContext(ctx, "fetched metric temporalities and types", "metric_temporality", metricTemporality, "metric_types", metricTypes)
|
||||
}
|
||||
|
||||
queries := make(map[string]qbtypes.Query)
|
||||
steps := make(map[string]qbtypes.Step)
|
||||
missingMetricQueries := make(map[string]struct{})
|
||||
|
||||
for _, query := range req.CompositeQuery.Queries {
|
||||
var queryName string
|
||||
@@ -374,23 +361,46 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
queries[spec.Name] = bq
|
||||
steps[spec.Name] = spec.StepInterval
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
var metricTemporality map[string]metrictypes.Temporality
|
||||
var metricTypes map[string]metrictypes.Type
|
||||
if len(metricNames) > 0 {
|
||||
var err error
|
||||
metricTemporality, metricTypes, err = q.metadataStore.FetchTemporalityAndTypeMulti(ctx, req.Start, req.End, metricNames...)
|
||||
if err != nil {
|
||||
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
|
||||
// Continue without temporality - statement builder will handle unspecified
|
||||
metricTemporality = make(map[string]metrictypes.Temporality)
|
||||
metricTypes = make(map[string]metrictypes.Type)
|
||||
}
|
||||
q.logger.DebugContext(ctx, "fetched metric temporalities and types", "metric_temporality", metricTemporality, "metric_types", metricTypes)
|
||||
}
|
||||
var presentAggregations []qbtypes.MetricAggregation
|
||||
for i := range spec.Aggregations {
|
||||
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Temporality == metrictypes.Unknown {
|
||||
if temp, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && temp != metrictypes.Unknown {
|
||||
spec.Aggregations[i].Temporality = temp
|
||||
if spec.Aggregations[i].MetricName == "" {
|
||||
continue
|
||||
}
|
||||
if spec.Aggregations[i].Temporality == metrictypes.Unknown {
|
||||
if foundMetricTemporality, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && foundMetricTemporality != metrictypes.Unknown {
|
||||
spec.Aggregations[i].Temporality = foundMetricTemporality
|
||||
}
|
||||
}
|
||||
// TODO(srikanthccv): warn when the metric is missing
|
||||
if spec.Aggregations[i].Temporality == metrictypes.Unknown {
|
||||
spec.Aggregations[i].Temporality = metrictypes.Unspecified
|
||||
}
|
||||
|
||||
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
|
||||
if spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
|
||||
if foundMetricType, ok := metricTypes[spec.Aggregations[i].MetricName]; ok && foundMetricType != metrictypes.UnspecifiedType {
|
||||
spec.Aggregations[i].Type = foundMetricType
|
||||
}
|
||||
}
|
||||
// If metric is still unknown after lookup, it was not found in the time range
|
||||
if spec.Aggregations[i].Temporality == metrictypes.Unknown && spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
|
||||
continue
|
||||
}
|
||||
presentAggregations = append(presentAggregations, spec.Aggregations[i])
|
||||
}
|
||||
if len(presentAggregations) == 0 {
|
||||
missingMetricQueries[spec.Name] = struct{}{}
|
||||
steps[spec.Name] = spec.StepInterval
|
||||
continue
|
||||
}
|
||||
spec.Aggregations = presentAggregations
|
||||
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
||||
var bq *builderQuery[qbtypes.MetricAggregation]
|
||||
@@ -409,7 +419,18 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
}
|
||||
}
|
||||
}
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event)
|
||||
preseededResults := make(map[string]any)
|
||||
for name := range missingMetricQueries {
|
||||
switch req.RequestType {
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
preseededResults[name] = &qbtypes.TimeSeriesData{QueryName: name}
|
||||
case qbtypes.RequestTypeScalar:
|
||||
preseededResults[name] = &qbtypes.ScalarData{QueryName: name}
|
||||
case qbtypes.RequestTypeRaw:
|
||||
preseededResults[name] = &qbtypes.RawData{QueryName: name}
|
||||
}
|
||||
}
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event, preseededResults)
|
||||
if qbResp != nil {
|
||||
qbResp.QBEvent = event
|
||||
if len(intervalWarnings) != 0 && req.RequestType == qbtypes.RequestTypeTimeSeries {
|
||||
@@ -498,7 +519,7 @@ func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qb
|
||||
})
|
||||
queries[spec.Name] = bq
|
||||
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, nil, event)
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, nil, event, nil)
|
||||
if qbErr != nil {
|
||||
client.Error <- qbErr
|
||||
return
|
||||
@@ -527,6 +548,7 @@ func (q *querier) run(
|
||||
req *qbtypes.QueryRangeRequest,
|
||||
steps map[string]qbtypes.Step,
|
||||
qbEvent *qbtypes.QBEvent,
|
||||
preseededResults map[string]any,
|
||||
) (*qbtypes.QueryRangeResponse, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.PanelType: qbEvent.PanelType,
|
||||
@@ -612,6 +634,8 @@ func (q *querier) run(
|
||||
}
|
||||
}
|
||||
|
||||
gomaps.Copy(results, preseededResults)
|
||||
|
||||
processedResults, err := q.postProcessResults(ctx, results, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -77,6 +77,21 @@ func TestManager_TestNotification_SendUnmatched_ThresholdRule(t *testing.T) {
|
||||
|
||||
mock := mockStore.Mock()
|
||||
|
||||
// Mock metadata queries for FetchTemporalityAndTypeMulti
|
||||
// First query: fetchMetricsTemporalityAndType (from signoz_metrics time series table)
|
||||
metadataCols := []cmock.ColumnType{
|
||||
{Name: "metric_name", Type: "String"},
|
||||
{Name: "temporality", Type: "String"},
|
||||
{Name: "type", Type: "String"},
|
||||
{Name: "is_monotonic", Type: "Bool"},
|
||||
}
|
||||
metadataRows := cmock.NewRows(metadataCols, [][]any{
|
||||
{"probe_success", metrictypes.Unspecified, metrictypes.GaugeType, false},
|
||||
})
|
||||
mock.ExpectQuery("*distributed_time_series_v4*").WithArgs(nil, nil, nil).WillReturnRows(metadataRows)
|
||||
// Second query: fetchMeterSourceMetricsTemporalityAndType (from signoz_meter table)
|
||||
emptyMetadataRows := cmock.NewRows(metadataCols, [][]any{})
|
||||
mock.ExpectQuery("*meter*").WithArgs(nil).WillReturnRows(emptyMetadataRows)
|
||||
// Generate query arguments for the metric query
|
||||
evalTime := time.Now().UTC()
|
||||
evalWindow := 5 * time.Minute
|
||||
|
||||
Reference in New Issue
Block a user