Files
signoz/pkg/querier/api.go
Pandey b811991f9d feat(middleware): add panic recovery middleware (#10666)
* feat(middleware): add panic recovery middleware with TypeFatal error type

Add a global HTTP recovery middleware that catches panics, logs them
with OTel exception semantic conventions via errors.Attr, and returns
a safe user-facing error response. Introduce TypeFatal/CodeFatal for
unrecoverable failures and WithStacktrace to attach pre-formatted
stack traces to errors. Remove redundant per-handler panic recovery
blocks in querier APIs.

* style(errors): keep WithStacktrace call on same line in test

* fix(middleware): replace fmt.Errorf with errors.New in recovery test

* feat(middleware): add request context to panic recovery logs

Capture request body before handler runs and include method, path, and
body in panic recovery logs using OTel semconv attributes. Improve error
message to direct users to GitHub issues or support.
2026-03-23 06:25:26 +00:00

277 lines
7.7 KiB
Go

package querier
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"github.com/SigNoz/signoz/pkg/analytics"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/variables"
)
type handler struct {
set factory.ProviderSettings
analytics analytics.Analytics
querier Querier
}
func NewHandler(set factory.ProviderSettings, querier Querier, analytics analytics.Analytics) Handler {
return &handler{set: set, querier: querier, analytics: analytics}
}
func (handler *handler) QueryRange(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "querier",
instrumentationtypes.CodeFunctionName: "QueryRange",
})
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
var queryRangeRequest qbtypes.QueryRangeRequest
if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil {
render.Error(rw, err)
return
}
// Validate the query request
if err := queryRangeRequest.Validate(); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
queryRangeResponse, err := handler.querier.QueryRange(ctx, orgID, &queryRangeRequest)
if err != nil {
render.Error(rw, err)
return
}
handler.logEvent(req.Context(), req.Header.Get("Referer"), queryRangeResponse.QBEvent)
render.Success(rw, http.StatusOK, queryRangeResponse)
}
func (handler *handler) QueryRawStream(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// get the param from url and add it to body
startParam := req.URL.Query().Get("start")
filterParam := req.URL.Query().Get("filter")
start, err := strconv.ParseUint(startParam, 10, 64)
if err != nil {
start = 0
}
// create the v5 request param
queryRangeRequest := qbtypes.QueryRangeRequest{
Start: start,
RequestType: qbtypes.RequestTypeRawStream,
CompositeQuery: qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Name: "raw_stream",
Filter: &qbtypes.Filter{
Expression: filterParam,
},
Limit: 500,
Order: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
Materialized: true,
},
},
},
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "id",
Materialized: true,
},
},
},
},
},
},
},
},
}
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
// Validate the query request
if err := queryRangeRequest.Validate(); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Access-Control-Allow-Origin", "*")
rw.WriteHeader(200)
flusher, ok := rw.(http.Flusher)
if !ok {
render.Error(rw, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "streaming is not supported"))
return
}
flusher.Flush()
client := &qbtypes.RawStream{Name: req.RemoteAddr, Logs: make(chan *qbtypes.RawRow, 1000), Done: make(chan *bool), Error: make(chan error)}
go handler.querier.QueryRawStream(ctx, orgID, &queryRangeRequest, client)
for {
select {
case log := <-client.Logs:
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
err := enc.Encode(log)
if err != nil {
fmt.Fprintf(rw, "event: error\ndata: %v\n\n", err.Error())
flusher.Flush()
return
}
fmt.Fprintf(rw, "data: %v\n\n", buf.String())
flusher.Flush()
case <-client.Done:
return
case err := <-client.Error:
fmt.Fprintf(rw, "event: error\ndata: %v\n\n", err.Error())
flusher.Flush()
return
}
}
}
// TODO(srikanthccv): everything done here can be done on frontend as well
// For the time being I am adding a helper function
func (handler *handler) ReplaceVariables(rw http.ResponseWriter, req *http.Request) {
var queryRangeRequest qbtypes.QueryRangeRequest
if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil {
render.Error(rw, err)
return
}
errs := []error{}
for idx, item := range queryRangeRequest.CompositeQuery.Queries {
if item.Type == qbtypes.QueryTypeBuilder {
switch spec := item.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if spec.Filter != nil && spec.Filter.Expression != "" {
replaced, err := variables.ReplaceVariablesInExpression(spec.Filter.Expression, queryRangeRequest.Variables)
if err != nil {
errs = append(errs, err)
}
spec.Filter.Expression = replaced
}
queryRangeRequest.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if spec.Filter != nil && spec.Filter.Expression != "" {
replaced, err := variables.ReplaceVariablesInExpression(spec.Filter.Expression, queryRangeRequest.Variables)
if err != nil {
errs = append(errs, err)
}
spec.Filter.Expression = replaced
}
queryRangeRequest.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if spec.Filter != nil && spec.Filter.Expression != "" {
replaced, err := variables.ReplaceVariablesInExpression(spec.Filter.Expression, queryRangeRequest.Variables)
if err != nil {
errs = append(errs, err)
}
spec.Filter.Expression = replaced
}
queryRangeRequest.CompositeQuery.Queries[idx].Spec = spec
}
}
}
if len(errs) != 0 {
render.Error(rw, errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, errors.Join(errs...).Error()))
return
}
render.Success(rw, http.StatusOK, queryRangeRequest)
}
func (handler *handler) logEvent(ctx context.Context, referrer string, event *qbtypes.QBEvent) {
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
return
}
if !(event.LogsUsed || event.MetricsUsed || event.TracesUsed) {
return
}
properties := map[string]any{
"version": event.Version,
"logs_used": event.LogsUsed,
"traces_used": event.TracesUsed,
"metrics_used": event.MetricsUsed,
"source": event.Source,
"filter_applied": event.FilterApplied,
"group_by_applied": event.GroupByApplied,
"query_type": event.QueryType,
"panel_type": event.PanelType,
"number_of_queries": event.NumberOfQueries,
}
if referrer == "" {
return
}
comments := ctxtypes.CommentFromContext(ctx).Map()
for key, value := range comments {
properties[key] = value
}
if !event.HasData {
handler.analytics.TrackUser(ctx, claims.OrgID, claims.UserID, "Telemetry Query Returned Empty", properties)
return
}
handler.analytics.TrackUser(ctx, claims.OrgID, claims.UserID, "Telemetry Query Returned Results", properties)
}