Files
signoz/pkg/querier/api.go
Vikrant Gupta bad80399a6
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
feat(serviceaccount): integrate service account (#10681)
* feat(serviceaccount): integrate service account

* feat(serviceaccount): integrate service account with better types

* feat(serviceaccount): fix lint and testing changes

* feat(serviceaccount): update integration tests

* feat(serviceaccount): fix formatting

* feat(serviceaccount): fix openapi spec

* feat(serviceaccount): update txlock to immediate to avoid busy snapshot errors

* feat(serviceaccount): add restrictions for factor_api_key

* feat(serviceaccount): add restrictions for factor_api_key

* feat: enabled service account and deprecated API Keys (#10715)

* feat: enabled service account and deprecated API Keys

* feat: deprecated API Keys

* feat: service account spec updates and role management changes

* feat: updated the error component for roles management

* feat: updated test case

* feat: updated the error component and added retries

* feat: refactored code and added retry to happend 3 times total

* feat: fixed feedbacks and added test case

* feat: refactored code and removed retry

* feat: updated the test cases

---------

Co-authored-by: SagarRajput-7 <162284829+SagarRajput-7@users.noreply.github.com>
2026-04-01 07:20:59 +00:00

277 lines
7.7 KiB
Go

package querier
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"github.com/SigNoz/signoz/pkg/analytics"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/variables"
)
type handler struct {
set factory.ProviderSettings
analytics analytics.Analytics
querier Querier
}
func NewHandler(set factory.ProviderSettings, querier Querier, analytics analytics.Analytics) Handler {
return &handler{set: set, querier: querier, analytics: analytics}
}
func (handler *handler) QueryRange(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "querier",
instrumentationtypes.CodeFunctionName: "QueryRange",
})
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
var queryRangeRequest qbtypes.QueryRangeRequest
if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil {
render.Error(rw, err)
return
}
// Validate the query request
if err := queryRangeRequest.Validate(); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
queryRangeResponse, err := handler.querier.QueryRange(ctx, orgID, &queryRangeRequest)
if err != nil {
render.Error(rw, err)
return
}
handler.logEvent(req.Context(), req.Header.Get("Referer"), queryRangeResponse.QBEvent)
render.Success(rw, http.StatusOK, queryRangeResponse)
}
func (handler *handler) QueryRawStream(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// get the param from url and add it to body
startParam := req.URL.Query().Get("start")
filterParam := req.URL.Query().Get("filter")
start, err := strconv.ParseUint(startParam, 10, 64)
if err != nil {
start = 0
}
// create the v5 request param
queryRangeRequest := qbtypes.QueryRangeRequest{
Start: start,
RequestType: qbtypes.RequestTypeRawStream,
CompositeQuery: qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Name: "raw_stream",
Filter: &qbtypes.Filter{
Expression: filterParam,
},
Limit: 500,
Order: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
Materialized: true,
},
},
},
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "id",
Materialized: true,
},
},
},
},
},
},
},
},
}
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
// Validate the query request
if err := queryRangeRequest.Validate(); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Access-Control-Allow-Origin", "*")
rw.WriteHeader(200)
flusher, ok := rw.(http.Flusher)
if !ok {
render.Error(rw, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "streaming is not supported"))
return
}
flusher.Flush()
client := &qbtypes.RawStream{Name: req.RemoteAddr, Logs: make(chan *qbtypes.RawRow, 1000), Done: make(chan *bool), Error: make(chan error)}
go handler.querier.QueryRawStream(ctx, orgID, &queryRangeRequest, client)
for {
select {
case log := <-client.Logs:
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
err := enc.Encode(log)
if err != nil {
fmt.Fprintf(rw, "event: error\ndata: %v\n\n", err.Error())
flusher.Flush()
return
}
fmt.Fprintf(rw, "data: %v\n\n", buf.String())
flusher.Flush()
case <-client.Done:
return
case err := <-client.Error:
fmt.Fprintf(rw, "event: error\ndata: %v\n\n", err.Error())
flusher.Flush()
return
}
}
}
// TODO(srikanthccv): everything done here can be done on frontend as well
// For the time being I am adding a helper function.
func (handler *handler) ReplaceVariables(rw http.ResponseWriter, req *http.Request) {
var queryRangeRequest qbtypes.QueryRangeRequest
if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil {
render.Error(rw, err)
return
}
errs := []error{}
for idx, item := range queryRangeRequest.CompositeQuery.Queries {
if item.Type == qbtypes.QueryTypeBuilder {
switch spec := item.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if spec.Filter != nil && spec.Filter.Expression != "" {
replaced, err := variables.ReplaceVariablesInExpression(spec.Filter.Expression, queryRangeRequest.Variables)
if err != nil {
errs = append(errs, err)
}
spec.Filter.Expression = replaced
}
queryRangeRequest.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if spec.Filter != nil && spec.Filter.Expression != "" {
replaced, err := variables.ReplaceVariablesInExpression(spec.Filter.Expression, queryRangeRequest.Variables)
if err != nil {
errs = append(errs, err)
}
spec.Filter.Expression = replaced
}
queryRangeRequest.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if spec.Filter != nil && spec.Filter.Expression != "" {
replaced, err := variables.ReplaceVariablesInExpression(spec.Filter.Expression, queryRangeRequest.Variables)
if err != nil {
errs = append(errs, err)
}
spec.Filter.Expression = replaced
}
queryRangeRequest.CompositeQuery.Queries[idx].Spec = spec
}
}
}
if len(errs) != 0 {
render.Error(rw, errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, errors.Join(errs...).Error()))
return
}
render.Success(rw, http.StatusOK, queryRangeRequest)
}
func (handler *handler) logEvent(ctx context.Context, referrer string, event *qbtypes.QBEvent) {
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
return
}
if !event.LogsUsed && !event.MetricsUsed && !event.TracesUsed {
return
}
properties := map[string]any{
"version": event.Version,
"logs_used": event.LogsUsed,
"traces_used": event.TracesUsed,
"metrics_used": event.MetricsUsed,
"source": event.Source,
"filter_applied": event.FilterApplied,
"group_by_applied": event.GroupByApplied,
"query_type": event.QueryType,
"panel_type": event.PanelType,
"number_of_queries": event.NumberOfQueries,
}
if referrer == "" {
return
}
comments := ctxtypes.CommentFromContext(ctx).Map()
for key, value := range comments {
properties[key] = value
}
if !event.HasData {
handler.analytics.TrackUser(ctx, claims.OrgID, claims.IdentityID(), "Telemetry Query Returned Empty", properties)
return
}
handler.analytics.TrackUser(ctx, claims.OrgID, claims.IdentityID(), "Telemetry Query Returned Results", properties)
}