mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-24 05:10:27 +00:00
Compare commits
1 Commits
refactor/c
...
fix/query-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
63bbfc3b88 |
@@ -16,10 +16,11 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/thirdpartyapitypes"
|
||||
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/govaluate"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
|
||||
queues2 "github.com/SigNoz/signoz/pkg/query-service/app/integrations/messagingQueues/queues"
|
||||
"log/slog"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
promModel "github.com/prometheus/common/model"
|
||||
@@ -961,6 +962,57 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
|
||||
}
|
||||
}
|
||||
|
||||
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
|
||||
if query.Disabled {
|
||||
continue
|
||||
}
|
||||
if query.Filters == nil {
|
||||
continue
|
||||
}
|
||||
for _, filter := range query.Filters.Items {
|
||||
if filter.Value == nil {
|
||||
query.Disabled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Disable queries that reference disabled queries in their expressions
|
||||
// This needs to be done iteratively until no more queries are disabled
|
||||
// because disabling one query may cause another dependent query to need disabling
|
||||
for {
|
||||
changed := false
|
||||
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
|
||||
if query.Disabled {
|
||||
continue
|
||||
}
|
||||
// Skip non-formula queries (where expression equals query name)
|
||||
if query.QueryName == query.Expression {
|
||||
continue
|
||||
}
|
||||
|
||||
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, postprocess.EvalFuncs())
|
||||
if err != nil {
|
||||
// Expression parsing already validated earlier, skip gracefully
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if any variable in the expression references a disabled query
|
||||
for _, v := range expression.Vars() {
|
||||
if refQuery, ok := queryRangeParams.CompositeQuery.BuilderQueries[v]; ok {
|
||||
if refQuery.Disabled {
|
||||
query.Disabled = true
|
||||
changed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !changed {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return queryRangeParams, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1574,3 +1574,215 @@ func TestParseQueryRangeParamsStepIntervalAdjustment(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseQueryRangeParams_DisablesQueriesWithEmptyFilterValues(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
filterValue interface{}
|
||||
expectDisabled bool
|
||||
}{
|
||||
{
|
||||
name: "nil filter value disables query",
|
||||
filterValue: nil,
|
||||
expectDisabled: true,
|
||||
},
|
||||
{
|
||||
name: "empty string filter value keeps query enabled",
|
||||
filterValue: "",
|
||||
expectDisabled: false,
|
||||
},
|
||||
{
|
||||
name: "non-empty string filter value keeps query enabled",
|
||||
filterValue: "some-value",
|
||||
expectDisabled: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
queryRangeParams := v3.QueryRangeParamsV3{
|
||||
Start: time.Now().Add(-1 * time.Hour).UnixMilli(),
|
||||
End: time.Now().UnixMilli(),
|
||||
Step: 60,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
PanelType: v3.PanelTypeGraph,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSum,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "test_metric", DataType: "float64"},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "namespace"},
|
||||
Operator: v3.FilterOperatorEqual,
|
||||
Value: tc.filterValue,
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
StepInterval: 60,
|
||||
},
|
||||
},
|
||||
},
|
||||
Variables: map[string]interface{}{},
|
||||
}
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
err := json.NewEncoder(body).Encode(queryRangeParams)
|
||||
require.NoError(t, err)
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body)
|
||||
|
||||
p, apiErr := ParseQueryRangeParams(req)
|
||||
require.Nil(t, apiErr)
|
||||
|
||||
assert.Equal(t, tc.expectDisabled, p.CompositeQuery.BuilderQueries["A"].Disabled,
|
||||
"Query disabled state mismatch for filter value: %v", tc.filterValue)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseQueryRangeParams_DisablesExpressionQueriesWithDisabledDependencies(t *testing.T) {
|
||||
// Test that expression queries are disabled when their dependencies are disabled
|
||||
queryRangeParams := v3.QueryRangeParamsV3{
|
||||
Start: time.Now().Add(-1 * time.Hour).UnixMilli(),
|
||||
End: time.Now().UnixMilli(),
|
||||
Step: 60,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
PanelType: v3.PanelTypeGraph,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSum,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "metric_a", DataType: "float64"},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "namespace"},
|
||||
Operator: v3.FilterOperatorEqual,
|
||||
Value: "", // Empty value will disable this query
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
StepInterval: 60,
|
||||
},
|
||||
"B": {
|
||||
QueryName: "B",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSum,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "metric_b", DataType: "float64"},
|
||||
Expression: "B",
|
||||
StepInterval: 60,
|
||||
},
|
||||
"F1": {
|
||||
QueryName: "F1",
|
||||
Expression: "A + B", // This depends on A, which is disabled
|
||||
},
|
||||
},
|
||||
},
|
||||
Variables: map[string]interface{}{},
|
||||
}
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
err := json.NewEncoder(body).Encode(queryRangeParams)
|
||||
require.NoError(t, err)
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body)
|
||||
|
||||
p, apiErr := ParseQueryRangeParams(req)
|
||||
require.Nil(t, apiErr)
|
||||
|
||||
// A should be disabled because of empty filter value
|
||||
assert.True(t, p.CompositeQuery.BuilderQueries["A"].Disabled, "Query A should be disabled due to empty filter value")
|
||||
|
||||
// B should not be disabled
|
||||
assert.False(t, p.CompositeQuery.BuilderQueries["B"].Disabled, "Query B should not be disabled")
|
||||
|
||||
// F1 should be disabled because it depends on disabled query A
|
||||
assert.True(t, p.CompositeQuery.BuilderQueries["F1"].Disabled, "Query F1 should be disabled because it depends on disabled query A")
|
||||
}
|
||||
|
||||
func TestParseQueryRangeParams_DisablesMultipleFormulaDependencies(t *testing.T) {
|
||||
// Test that multiple formula queries depending on a disabled query are all disabled
|
||||
queryRangeParams := v3.QueryRangeParamsV3{
|
||||
Start: time.Now().Add(-1 * time.Hour).UnixMilli(),
|
||||
End: time.Now().UnixMilli(),
|
||||
Step: 60,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
PanelType: v3.PanelTypeGraph,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSum,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "metric_a", DataType: "float64"},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "namespace"},
|
||||
Operator: v3.FilterOperatorEqual,
|
||||
Value: nil, // nil value will disable this query
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
StepInterval: 60,
|
||||
},
|
||||
"B": {
|
||||
QueryName: "B",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSum,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "metric_b", DataType: "float64"},
|
||||
Expression: "B",
|
||||
StepInterval: 60,
|
||||
},
|
||||
"C": {
|
||||
QueryName: "C",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSum,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "metric_c", DataType: "float64"},
|
||||
Expression: "C",
|
||||
StepInterval: 60,
|
||||
},
|
||||
"F1": {
|
||||
QueryName: "F1",
|
||||
Expression: "A + B", // Depends on A (disabled) and B
|
||||
},
|
||||
"F2": {
|
||||
QueryName: "F2",
|
||||
Expression: "B + C", // Depends on B and C (both enabled)
|
||||
},
|
||||
},
|
||||
},
|
||||
Variables: map[string]interface{}{},
|
||||
}
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
err := json.NewEncoder(body).Encode(queryRangeParams)
|
||||
require.NoError(t, err)
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body)
|
||||
|
||||
p, apiErr := ParseQueryRangeParams(req)
|
||||
require.Nil(t, apiErr)
|
||||
|
||||
// A should be disabled because of nil filter value
|
||||
assert.True(t, p.CompositeQuery.BuilderQueries["A"].Disabled, "Query A should be disabled")
|
||||
|
||||
// B and C should not be disabled
|
||||
assert.False(t, p.CompositeQuery.BuilderQueries["B"].Disabled, "Query B should not be disabled")
|
||||
assert.False(t, p.CompositeQuery.BuilderQueries["C"].Disabled, "Query C should not be disabled")
|
||||
|
||||
// F1 should be disabled because it depends on A
|
||||
assert.True(t, p.CompositeQuery.BuilderQueries["F1"].Disabled, "Query F1 should be disabled because it depends on A")
|
||||
|
||||
// F2 should NOT be disabled because both B and C are enabled
|
||||
assert.False(t, p.CompositeQuery.BuilderQueries["F2"].Disabled, "Query F2 should not be disabled because both B and C are enabled")
|
||||
}
|
||||
|
||||
@@ -164,6 +164,9 @@ func (q *querier) runBuilderQueries(ctx context.Context, orgID valuer.UUID, para
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
|
||||
if builderQuery.Disabled {
|
||||
continue
|
||||
}
|
||||
if queryName == builderQuery.Expression {
|
||||
wg.Add(1)
|
||||
go q.runBuilderQuery(ctx, orgID, builderQuery, params, cacheKeys, ch, &wg)
|
||||
|
||||
@@ -2271,3 +2271,79 @@ func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2DisabledQueriesAreNotExecuted(t *testing.T) {
|
||||
// Test that queries with Disabled: true are not executed
|
||||
params := &v3.QueryRangeParamsV3{
|
||||
Start: 1675115596722,
|
||||
End: 1675115596722 + 120*60*1000,
|
||||
Step: 60,
|
||||
Version: "v4",
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
PanelType: v3.PanelTypeGraph,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
Temporality: v3.Delta,
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "test_metric", DataType: "float64", IsColumn: true},
|
||||
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||
TimeAggregation: v3.TimeAggregationRate,
|
||||
SpaceAggregation: v3.SpaceAggregationSum,
|
||||
Expression: "A",
|
||||
Disabled: true, // This query is disabled
|
||||
},
|
||||
"B": {
|
||||
QueryName: "B",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
Temporality: v3.Delta,
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "other_metric", DataType: "float64", IsColumn: true},
|
||||
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||
TimeAggregation: v3.TimeAggregationRate,
|
||||
SpaceAggregation: v3.SpaceAggregationSum,
|
||||
Expression: "B",
|
||||
Disabled: false, // This query is enabled
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
cacheOpts := cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
MaxCost: 1 << 26,
|
||||
}
|
||||
c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts})
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := QuerierOptions{
|
||||
Cache: c,
|
||||
Reader: nil,
|
||||
FluxInterval: 5 * time.Minute,
|
||||
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||
TestingMode: true,
|
||||
ReturnedSeries: []*v3.Series{
|
||||
{
|
||||
Labels: map[string]string{"__name__": "other_metric"},
|
||||
Points: []v3.Point{
|
||||
{Timestamp: 1675115596722, Value: 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
q := NewQuerier(opts)
|
||||
|
||||
_, errByName, err := q.QueryRange(context.Background(), valuer.GenerateUUID(), params)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, errByName)
|
||||
|
||||
// Verify only the enabled query (B) was executed, not the disabled one (A)
|
||||
executedQueries := q.QueriesExecuted()
|
||||
require.Len(t, executedQueries, 1, "Expected exactly 1 query to be executed (only B, not A)")
|
||||
|
||||
// The executed query should be for "other_metric" (query B), not "test_metric" (query A)
|
||||
require.Contains(t, executedQueries[0], "other_metric", "Expected query B to be executed")
|
||||
require.NotContains(t, executedQueries[0], "test_metric", "Expected query A (disabled) to NOT be executed")
|
||||
}
|
||||
|
||||
@@ -180,6 +180,9 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3) (map[strin
|
||||
if compositeQuery != nil {
|
||||
// Build queries for each builder query
|
||||
for queryName, query := range compositeQuery.BuilderQueries {
|
||||
if query.Disabled {
|
||||
continue
|
||||
}
|
||||
// making a local clone since we should not update the global params if there is sift by
|
||||
start := params.Start
|
||||
end := params.End
|
||||
@@ -246,6 +249,9 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3) (map[strin
|
||||
|
||||
// Build queries for each expression
|
||||
for _, query := range compositeQuery.BuilderQueries {
|
||||
if query.Disabled {
|
||||
continue
|
||||
}
|
||||
if query.Expression != query.QueryName {
|
||||
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs)
|
||||
|
||||
|
||||
@@ -255,7 +255,7 @@ func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) {
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
Disabled: true,
|
||||
Disabled: false,
|
||||
StepInterval: 60,
|
||||
OrderBy: []v3.OrderBy{
|
||||
{
|
||||
@@ -292,7 +292,7 @@ func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) {
|
||||
Items: []v3.FilterItem{},
|
||||
},
|
||||
Expression: "B",
|
||||
Disabled: true,
|
||||
Disabled: false,
|
||||
StepInterval: 60,
|
||||
OrderBy: []v3.OrderBy{
|
||||
{
|
||||
@@ -341,6 +341,62 @@ func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
})
|
||||
t.Run("TestFormulaWithDisabledDependencies", func(t *testing.T) {
|
||||
// Test that formula queries are skipped when their dependencies are disabled
|
||||
q := &v3.QueryRangeParamsV3{
|
||||
Start: 1735036101000,
|
||||
End: 1735637901000,
|
||||
Step: 60,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
PanelType: v3.PanelTypeGraph,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorAvg,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "system.memory.usage",
|
||||
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||
},
|
||||
Expression: "A",
|
||||
Disabled: true, // A is disabled
|
||||
StepInterval: 60,
|
||||
},
|
||||
"B": {
|
||||
QueryName: "B",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSum,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "system.network.io",
|
||||
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||
},
|
||||
Expression: "B",
|
||||
Disabled: true, // B is disabled
|
||||
StepInterval: 60,
|
||||
},
|
||||
"F1": {
|
||||
QueryName: "F1",
|
||||
Expression: "A + B",
|
||||
Disabled: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
qbOptions := QueryBuilderOptions{
|
||||
BuildMetricQuery: metricsv3.PrepareMetricQuery,
|
||||
}
|
||||
qb := NewQueryBuilder(qbOptions)
|
||||
|
||||
queries, err := qb.PrepareQueries(q)
|
||||
require.NoError(t, err)
|
||||
// A and B are disabled, so they should not be in the queries map
|
||||
require.NotContains(t, queries, "A")
|
||||
require.NotContains(t, queries, "B")
|
||||
// F1 depends on disabled queries, so it should produce invalid/empty SQL
|
||||
// The formula is still "built" but with empty subqueries since dependencies are missing
|
||||
// This is expected - the parser.go disables F1 before PrepareQueries is called
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeltaQueryBuilder(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user