|
|
|
|
@@ -92,11 +92,6 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
|
|
|
|
req.Start = querybuilder.ToMilliSecs(req.Start)
|
|
|
|
|
req.End = querybuilder.ToMilliSecs(req.End)
|
|
|
|
|
|
|
|
|
|
tmplVars := req.Variables
|
|
|
|
|
if tmplVars == nil {
|
|
|
|
|
tmplVars = make(map[string]qbtypes.VariableItem)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
event := &qbtypes.QBEvent{
|
|
|
|
|
Version: "v5",
|
|
|
|
|
NumberOfQueries: len(req.CompositeQuery.Queries),
|
|
|
|
|
@@ -116,9 +111,6 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
|
|
|
|
// We need to set if it is unspecified or adjust it if value is not within recommended range
|
|
|
|
|
intervalWarnings := q.adjustStepInterval(req.CompositeQuery.Queries, req.Start, req.End)
|
|
|
|
|
|
|
|
|
|
queries := make(map[string]qbtypes.Query)
|
|
|
|
|
steps := make(map[string]qbtypes.Step)
|
|
|
|
|
|
|
|
|
|
// Resolve metric metadata once per request: patches each metric-aggregation
|
|
|
|
|
// query's spec in place, returns the queries whose every aggregation was
|
|
|
|
|
// missing (used for preseeded empty results), and any dormant-metric
|
|
|
|
|
@@ -132,6 +124,62 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
|
|
|
|
missingMetricQuerySet[name] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queries, steps, err := q.buildQueries(req, dependencyQueries, missingMetricQuerySet, event)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
preseededResults := make(map[string]any)
|
|
|
|
|
for _, name := range missingMetricQueries {
|
|
|
|
|
switch req.RequestType {
|
|
|
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
|
|
|
preseededResults[name] = &qbtypes.TimeSeriesData{QueryName: name}
|
|
|
|
|
case qbtypes.RequestTypeScalar:
|
|
|
|
|
preseededResults[name] = &qbtypes.ScalarData{QueryName: name}
|
|
|
|
|
case qbtypes.RequestTypeRaw:
|
|
|
|
|
preseededResults[name] = &qbtypes.RawData{QueryName: name}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event, preseededResults)
|
|
|
|
|
if qbResp != nil {
|
|
|
|
|
qbResp.QBEvent = event
|
|
|
|
|
if len(intervalWarnings) != 0 && req.RequestType == qbtypes.RequestTypeTimeSeries {
|
|
|
|
|
if qbResp.Warning == nil {
|
|
|
|
|
qbResp.Warning = &qbtypes.QueryWarnData{
|
|
|
|
|
Warnings: make([]qbtypes.QueryWarnDataAdditional, len(intervalWarnings)),
|
|
|
|
|
}
|
|
|
|
|
for idx := range intervalWarnings {
|
|
|
|
|
qbResp.Warning.Warnings[idx] = qbtypes.QueryWarnDataAdditional{Message: intervalWarnings[idx]}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if dormantMetricsWarningMsg != "" {
|
|
|
|
|
if qbResp.Warning == nil {
|
|
|
|
|
qbResp.Warning = &qbtypes.QueryWarnData{}
|
|
|
|
|
}
|
|
|
|
|
qbResp.Warning.Warnings = append(qbResp.Warning.Warnings, qbtypes.QueryWarnDataAdditional{
|
|
|
|
|
Message: dormantMetricsWarningMsg,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return qbResp, qbErr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (q *querier) buildQueries(
|
|
|
|
|
req *qbtypes.QueryRangeRequest,
|
|
|
|
|
dependencyQueries map[string]bool,
|
|
|
|
|
missingMetricQuerySet map[string]bool,
|
|
|
|
|
event *qbtypes.QBEvent,
|
|
|
|
|
) (map[string]qbtypes.Query, map[string]qbtypes.Step, error) {
|
|
|
|
|
|
|
|
|
|
tmplVars := req.Variables
|
|
|
|
|
if tmplVars == nil {
|
|
|
|
|
tmplVars = make(map[string]qbtypes.VariableItem)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queries := make(map[string]qbtypes.Query)
|
|
|
|
|
steps := make(map[string]qbtypes.Step)
|
|
|
|
|
|
|
|
|
|
for _, query := range req.CompositeQuery.Queries {
|
|
|
|
|
queryName := query.GetQueryName()
|
|
|
|
|
|
|
|
|
|
@@ -144,7 +192,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
|
|
|
|
case qbtypes.QueryTypePromQL:
|
|
|
|
|
promQuery, ok := query.Spec.(qbtypes.PromQuery)
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", query.Spec)
|
|
|
|
|
return nil, nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", query.Spec)
|
|
|
|
|
}
|
|
|
|
|
promqlQuery := newPromqlQuery(q.logger, q.promEngine, promQuery, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType, tmplVars)
|
|
|
|
|
queries[promQuery.Name] = promqlQuery
|
|
|
|
|
@@ -152,14 +200,14 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
|
|
|
|
case qbtypes.QueryTypeClickHouseSQL:
|
|
|
|
|
chQuery, ok := query.Spec.(qbtypes.ClickHouseQuery)
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid clickhouse query spec %T", query.Spec)
|
|
|
|
|
return nil, nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid clickhouse query spec %T", query.Spec)
|
|
|
|
|
}
|
|
|
|
|
chSQLQuery := newchSQLQuery(q.logger, q.telemetryStore, chQuery, nil, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType, tmplVars)
|
|
|
|
|
queries[chQuery.Name] = chSQLQuery
|
|
|
|
|
case qbtypes.QueryTypeTraceOperator:
|
|
|
|
|
traceOpQuery, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator)
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid trace operator query spec %T", query.Spec)
|
|
|
|
|
return nil, nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid trace operator query spec %T", query.Spec)
|
|
|
|
|
}
|
|
|
|
|
toq := &traceOperatorQuery{
|
|
|
|
|
telemetryStore: q.telemetryStore,
|
|
|
|
|
@@ -212,44 +260,12 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
|
|
|
|
queries[spec.Name] = bq
|
|
|
|
|
steps[spec.Name] = spec.StepInterval
|
|
|
|
|
default:
|
|
|
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported builder spec type %T", query.Spec)
|
|
|
|
|
return nil, nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported builder spec type %T", query.Spec)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
preseededResults := make(map[string]any)
|
|
|
|
|
for _, name := range missingMetricQueries {
|
|
|
|
|
switch req.RequestType {
|
|
|
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
|
|
|
preseededResults[name] = &qbtypes.TimeSeriesData{QueryName: name}
|
|
|
|
|
case qbtypes.RequestTypeScalar:
|
|
|
|
|
preseededResults[name] = &qbtypes.ScalarData{QueryName: name}
|
|
|
|
|
case qbtypes.RequestTypeRaw:
|
|
|
|
|
preseededResults[name] = &qbtypes.RawData{QueryName: name}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event, preseededResults)
|
|
|
|
|
if qbResp != nil {
|
|
|
|
|
qbResp.QBEvent = event
|
|
|
|
|
if len(intervalWarnings) != 0 && req.RequestType == qbtypes.RequestTypeTimeSeries {
|
|
|
|
|
if qbResp.Warning == nil {
|
|
|
|
|
qbResp.Warning = &qbtypes.QueryWarnData{
|
|
|
|
|
Warnings: make([]qbtypes.QueryWarnDataAdditional, len(intervalWarnings)),
|
|
|
|
|
}
|
|
|
|
|
for idx := range intervalWarnings {
|
|
|
|
|
qbResp.Warning.Warnings[idx] = qbtypes.QueryWarnDataAdditional{Message: intervalWarnings[idx]}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if dormantMetricsWarningMsg != "" {
|
|
|
|
|
if qbResp.Warning == nil {
|
|
|
|
|
qbResp.Warning = &qbtypes.QueryWarnData{}
|
|
|
|
|
}
|
|
|
|
|
qbResp.Warning.Warnings = append(qbResp.Warning.Warnings, qbtypes.QueryWarnDataAdditional{
|
|
|
|
|
Message: dormantMetricsWarningMsg,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return qbResp, qbErr
|
|
|
|
|
|
|
|
|
|
return queries, steps, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (q *querier) populateQBEvent(event *qbtypes.QBEvent, queries []qbtypes.QueryEnvelope) {
|
|
|
|
|
|