Compare commits

..

1 Commits

Author SHA1 Message Date
nikhilmantri0902
c7ff617904 chore: function refactor 2026-05-20 12:31:57 +05:30
11 changed files with 79 additions and 184 deletions

2
go.mod
View File

@@ -11,7 +11,6 @@ require (
github.com/SigNoz/signoz-otel-collector v0.144.3
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/bytedance/sonic v1.14.1
github.com/cespare/xxhash/v2 v2.3.0
github.com/coreos/go-oidc/v3 v3.17.0
github.com/dgraph-io/ristretto/v2 v2.3.0
@@ -113,6 +112,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect
github.com/aws/smithy-go v1.24.2 // indirect
github.com/bytedance/gopkg v0.1.3 // indirect
github.com/bytedance/sonic v1.14.1 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 // indirect

View File

@@ -231,7 +231,7 @@ func (m *module) ListPods(ctx context.Context, orgID valuer.UUID, req *inframoni
return nil, err
}
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req, pageGroups)
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
@@ -314,19 +314,12 @@ func (m *module) ListNodes(ctx context.Context, orgID valuer.UUID, req *inframon
return nil, err
}
nodeConditionCounts, err := m.getPerGroupNodeConditionCounts(ctx, req, pageGroups)
nodeConditionCounts, err := m.getPerGroupNodeConditionCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostablePods.
podPhaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
podPhaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
@@ -409,14 +402,7 @@ func (m *module) ListNamespaces(ctx context.Context, orgID valuer.UUID, req *inf
return nil, err
}
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostablePods.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
@@ -498,27 +484,14 @@ func (m *module) ListClusters(ctx context.Context, orgID valuer.UUID, req *infra
return nil, err
}
// Reuse the nodes condition-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostableNodes. With default groupBy
// [k8s.cluster.name], counts are bucketed per cluster; with a custom groupBy,
// they aggregate across clusters in that group.
nodeConditionCountsMap, err := m.getPerGroupNodeConditionCounts(ctx, &inframonitoringtypes.PostableNodes{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
// With default groupBy [k8s.cluster.name], counts are bucketed per cluster;
// with a custom groupBy, they aggregate across clusters in that group.
nodeConditionCountsMap, err := m.getPerGroupNodeConditionCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
// Same pattern for pod phase counts via PostablePods shim.
podPhaseCountsMap, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
podPhaseCountsMap, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
@@ -689,14 +662,7 @@ func (m *module) ListDeployments(ctx context.Context, orgID valuer.UUID, req *in
return nil, err
}
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostablePods.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
@@ -784,16 +750,9 @@ func (m *module) ListStatefulSets(ctx context.Context, orgID valuer.UUID, req *i
return nil, err
}
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostablePods. Pods owned by a StatefulSet carry
// k8s.statefulset.name as a resource attribute, so default-groupBy gives
// per-statefulset phase counts automatically.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
// Pods owned by a StatefulSet carry k8s.statefulset.name as a resource attribute,
// so default-groupBy gives per-statefulset phase counts automatically.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
@@ -881,16 +840,9 @@ func (m *module) ListJobs(ctx context.Context, orgID valuer.UUID, req *inframoni
return nil, err
}
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostablePods. Pods owned by a Job carry
// k8s.job.name as a resource attribute, so default-groupBy gives
// per-job phase counts automatically.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
// Pods owned by a Job carry k8s.job.name as a resource attribute, so default-groupBy
// gives per-job phase counts automatically.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}
@@ -978,16 +930,9 @@ func (m *module) ListDaemonSets(ctx context.Context, orgID valuer.UUID, req *inf
return nil, err
}
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
// Start/End/Filter/GroupBy from PostablePods. Pods owned by a DaemonSet carry
// k8s.daemonset.name as a resource attribute, so default-groupBy gives
// per-daemonset phase counts automatically.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
Start: req.Start,
End: req.End,
Filter: req.Filter,
GroupBy: req.GroupBy,
}, pageGroups)
// Pods owned by a DaemonSet carry k8s.daemonset.name as a resource attribute,
// so default-groupBy gives per-daemonset phase counts automatically.
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
if err != nil {
return nil, err
}

View File

@@ -170,27 +170,29 @@ func (m *module) getNodesTableMetadata(ctx context.Context, req *inframonitoring
// Groups absent from the result map have implicit zero counts (caller default).
func (m *module) getPerGroupNodeConditionCounts(
ctx context.Context,
req *inframonitoringtypes.PostableNodes,
start, end int64,
filter *qbtypes.Filter,
groupBy []qbtypes.GroupByKey,
pageGroups []map[string]string,
) (map[string]nodeConditionCounts, error) {
if len(pageGroups) == 0 || len(req.GroupBy) == 0 {
if len(pageGroups) == 0 || len(groupBy) == 0 {
return map[string]nodeConditionCounts{}, nil
}
// Merged filter expression (user filter + page-groups IN clauses).
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
// Merge user filter with page-groups IN clauses.
userFilterExpr := ""
if filter != nil {
userFilterExpr = filter.Expression
}
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
mergedFilterExpr := mergeFilterExpressions(userFilterExpr, pageGroupsFilterExpr)
// Resolve tables. Same convention as pods.
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
uint64(req.Start), uint64(req.End), nil,
uint64(start), uint64(end), nil,
)
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
uint64(req.Start), uint64(req.End),
uint64(start), uint64(end),
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
)
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
@@ -201,7 +203,7 @@ func (m *module) getPerGroupNodeConditionCounts(
"fingerprint",
fmt.Sprintf("JSONExtractString(labels, %s) AS node_name", timeSeriesFPs.Var(nodeNameAttrKey)),
}
for _, key := range req.GroupBy {
for _, key := range groupBy {
timeSeriesFPsSelectCols = append(timeSeriesFPsSelectCols,
fmt.Sprintf("JSONExtractString(labels, %s) AS %s", timeSeriesFPs.Var(key.Name), quoteIdentifier(key.Name)),
)
@@ -213,8 +215,8 @@ func (m *module) getPerGroupNodeConditionCounts(
timeSeriesFPs.GE("unix_milli", adjustedStart),
timeSeriesFPs.L("unix_milli", adjustedEnd),
)
if filterExpr != "" {
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: filterExpr}, req.Start, req.End)
if mergedFilterExpr != "" {
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: mergedFilterExpr}, start, end)
if err != nil {
return nil, err
}
@@ -223,7 +225,7 @@ func (m *module) getPerGroupNodeConditionCounts(
}
}
timeSeriesFPsGroupBy := []string{"fingerprint", "node_name"}
for _, key := range req.GroupBy {
for _, key := range groupBy {
timeSeriesFPsGroupBy = append(timeSeriesFPsGroupBy, quoteIdentifier(key.Name))
}
timeSeriesFPs.GroupBy(timeSeriesFPsGroupBy...)
@@ -233,7 +235,7 @@ func (m *module) getPerGroupNodeConditionCounts(
latestConditionPerNode := sqlbuilder.NewSelectBuilder()
latestConditionPerNodeSelectCols := []string{"tsfp.node_name AS node_name"}
latestConditionPerNodeGroupBy := []string{"node_name"}
for _, key := range req.GroupBy {
for _, key := range groupBy {
col := quoteIdentifier(key.Name)
latestConditionPerNodeSelectCols = append(latestConditionPerNodeSelectCols, fmt.Sprintf("tsfp.%s AS %s", col, col))
latestConditionPerNodeGroupBy = append(latestConditionPerNodeGroupBy, col)
@@ -248,17 +250,17 @@ func (m *module) getPerGroupNodeConditionCounts(
))
latestConditionPerNode.Where(
latestConditionPerNode.E("samples.metric_name", nodeConditionMetricName),
latestConditionPerNode.GE("samples.unix_milli", req.Start),
latestConditionPerNode.L("samples.unix_milli", req.End),
latestConditionPerNode.GE("samples.unix_milli", start),
latestConditionPerNode.L("samples.unix_milli", end),
"tsfp.node_name != ''",
)
latestConditionPerNode.GroupBy(latestConditionPerNodeGroupBy...)
latestConditionPerNodeSQL, latestConditionPerNodeArgs := latestConditionPerNode.BuildWithFlavor(sqlbuilder.ClickHouse)
// ----- countNodesPerCondition (outer SELECT) -----
countNodesPerConditionSelectCols := make([]string, 0, len(req.GroupBy)+2)
countNodesPerConditionGroupBy := make([]string, 0, len(req.GroupBy))
for _, key := range req.GroupBy {
countNodesPerConditionSelectCols := make([]string, 0, len(groupBy)+2)
countNodesPerConditionGroupBy := make([]string, 0, len(groupBy))
for _, key := range groupBy {
col := quoteIdentifier(key.Name)
countNodesPerConditionSelectCols = append(countNodesPerConditionSelectCols, col)
countNodesPerConditionGroupBy = append(countNodesPerConditionGroupBy, col)
@@ -289,8 +291,8 @@ func (m *module) getPerGroupNodeConditionCounts(
result := make(map[string]nodeConditionCounts)
for rows.Next() {
groupVals := make([]string, len(req.GroupBy))
scanPtrs := make([]any, 0, len(req.GroupBy)+2)
groupVals := make([]string, len(groupBy))
scanPtrs := make([]any, 0, len(groupBy)+2)
for i := range groupVals {
scanPtrs = append(scanPtrs, &groupVals[i])
}

View File

@@ -189,27 +189,29 @@ func (m *module) getPodsTableMetadata(ctx context.Context, req *inframonitoringt
// Groups absent from the result map have implicit zero counts (caller default).
func (m *module) getPerGroupPodPhaseCounts(
ctx context.Context,
req *inframonitoringtypes.PostablePods,
start, end int64,
filter *qbtypes.Filter,
groupBy []qbtypes.GroupByKey,
pageGroups []map[string]string,
) (map[string]podPhaseCounts, error) {
if len(pageGroups) == 0 || len(req.GroupBy) == 0 {
if len(pageGroups) == 0 || len(groupBy) == 0 {
return map[string]podPhaseCounts{}, nil
}
// Merged filter expression (user filter + page-groups IN clauses).
reqFilterExpr := ""
if req.Filter != nil {
reqFilterExpr = req.Filter.Expression
// Merge user filter with page-groups IN clauses.
userFilterExpr := ""
if filter != nil {
userFilterExpr = filter.Expression
}
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
mergedFilterExpr := mergeFilterExpressions(userFilterExpr, pageGroupsFilterExpr)
// Resolve tables. Same convention as hosts (distributed names from helpers).
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
uint64(req.Start), uint64(req.End), nil,
uint64(start), uint64(end), nil,
)
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
uint64(req.Start), uint64(req.End),
uint64(start), uint64(end),
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
)
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
@@ -220,7 +222,7 @@ func (m *module) getPerGroupPodPhaseCounts(
"fingerprint",
fmt.Sprintf("JSONExtractString(labels, %s) AS pod_uid", timeSeriesFPs.Var(podUIDAttrKey)),
}
for _, key := range req.GroupBy {
for _, key := range groupBy {
timeSeriesFPsSelectCols = append(timeSeriesFPsSelectCols,
fmt.Sprintf("JSONExtractString(labels, %s) AS %s", timeSeriesFPs.Var(key.Name), quoteIdentifier(key.Name)),
)
@@ -232,8 +234,8 @@ func (m *module) getPerGroupPodPhaseCounts(
timeSeriesFPs.GE("unix_milli", adjustedStart),
timeSeriesFPs.L("unix_milli", adjustedEnd),
)
if filterExpr != "" {
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: filterExpr}, req.Start, req.End)
if mergedFilterExpr != "" {
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: mergedFilterExpr}, start, end)
if err != nil {
return nil, err
}
@@ -242,7 +244,7 @@ func (m *module) getPerGroupPodPhaseCounts(
}
}
timeSeriesFPsGroupBy := []string{"fingerprint", "pod_uid"}
for _, key := range req.GroupBy {
for _, key := range groupBy {
timeSeriesFPsGroupBy = append(timeSeriesFPsGroupBy, quoteIdentifier(key.Name))
}
timeSeriesFPs.GroupBy(timeSeriesFPsGroupBy...)
@@ -251,7 +253,7 @@ func (m *module) getPerGroupPodPhaseCounts(
latestPhasePerPod := sqlbuilder.NewSelectBuilder()
latestPhasePerPodSelectCols := []string{"tsfp.pod_uid AS pod_uid"}
latestPhasePerPodGroupBy := []string{"pod_uid"}
for _, key := range req.GroupBy {
for _, key := range groupBy {
col := quoteIdentifier(key.Name)
latestPhasePerPodSelectCols = append(latestPhasePerPodSelectCols, fmt.Sprintf("tsfp.%s AS %s", col, col))
latestPhasePerPodGroupBy = append(latestPhasePerPodGroupBy, col)
@@ -266,17 +268,17 @@ func (m *module) getPerGroupPodPhaseCounts(
))
latestPhasePerPod.Where(
latestPhasePerPod.E("samples.metric_name", podPhaseMetricName),
latestPhasePerPod.GE("samples.unix_milli", req.Start),
latestPhasePerPod.L("samples.unix_milli", req.End),
latestPhasePerPod.GE("samples.unix_milli", start),
latestPhasePerPod.L("samples.unix_milli", end),
"tsfp.pod_uid != ''",
)
latestPhasePerPod.GroupBy(latestPhasePerPodGroupBy...)
latestPhasePerPodSQL, latestPhasePerPodArgs := latestPhasePerPod.BuildWithFlavor(sqlbuilder.ClickHouse)
// ----- countPodsPerPhase (outer SELECT) -----
countPodsPerPhaseSelectCols := make([]string, 0, len(req.GroupBy)+5)
countPodsPerPhaseGroupBy := make([]string, 0, len(req.GroupBy))
for _, key := range req.GroupBy {
countPodsPerPhaseSelectCols := make([]string, 0, len(groupBy)+5)
countPodsPerPhaseGroupBy := make([]string, 0, len(groupBy))
for _, key := range groupBy {
col := quoteIdentifier(key.Name)
countPodsPerPhaseSelectCols = append(countPodsPerPhaseSelectCols, col)
countPodsPerPhaseGroupBy = append(countPodsPerPhaseGroupBy, col)
@@ -310,8 +312,8 @@ func (m *module) getPerGroupPodPhaseCounts(
result := make(map[string]podPhaseCounts)
for rows.Next() {
groupVals := make([]string, len(req.GroupBy))
scanPtrs := make([]any, 0, len(req.GroupBy)+5)
groupVals := make([]string, len(groupBy))
scanPtrs := make([]any, 0, len(groupBy)+5)
for i := range groupVals {
scanPtrs = append(scanPtrs, &groupVals[i])
}

View File

@@ -12,10 +12,8 @@ import (
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/bytedance/sonic"
)
var (
@@ -24,8 +22,6 @@ var (
// written clickhouse query. The column alias indcate which value is
// to be considered as final result (or target).
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
CodeFailUnmarshalJSONColumn = errors.MustNewCode("fail_unmarshal_json_column")
)
// consume reads every row and shapes it into the payload expected for the
@@ -397,16 +393,11 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
// de-reference the typed pointer to any
val := reflect.ValueOf(cellPtr).Elem().Interface()
// Post-process JSON columns: unmarshal bytes into map[string]any
// Post-process JSON columns: normalize into String value
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
switch x := val.(type) {
case []byte:
var m map[string]any
err := sonic.Unmarshal(x, &m)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailUnmarshalJSONColumn, "failed to unmarshal JSON column %s", name)
}
val = m
val = string(x)
default:
// already a structured type (map[string]any, []any, etc.)
}

View File

@@ -12,12 +12,9 @@ import (
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// queryInfo holds common query properties.
@@ -53,7 +50,7 @@ func getQueryName(spec any) string {
return getqueryInfo(spec).Name
}
func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
// Convert results to typed format for processing
typedResults := make(map[string]*qbtypes.Result)
for name, result := range results {
@@ -72,7 +69,6 @@ func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, res
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessBuilderQuery(q, result, spec, req)
result = q.postProcessLogBody(ctx, orgID, result, req)
typedResults[spec.Name] = result
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
@@ -1050,33 +1046,3 @@ func (q *querier) calculateFormulaStep(expression string, req *qbtypes.QueryRang
return result
}
// postProcessLogBody removes the "message" key from the body map when it is empty.
// Only runs for raw list queries with the use_json_body feature enabled.
func (q *querier) postProcessLogBody(ctx context.Context, orgID valuer.UUID, result *qbtypes.Result, req *qbtypes.QueryRangeRequest) *qbtypes.Result {
if req.RequestType != qbtypes.RequestTypeRaw {
return result
}
if !q.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(orgID)) {
return result
}
rawData, ok := result.Value.(*qbtypes.RawData)
if !ok {
return result
}
for _, row := range rawData.Rows {
bodyMap, ok := row.Data["body"].(map[string]any)
if !ok {
continue
}
if msg, exists := bodyMap["message"]; exists {
switch v := msg.(type) {
case string:
if v == "" {
delete(bodyMap, "message")
}
}
}
}
return result
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/querybuilder"
@@ -36,7 +35,6 @@ var (
type querier struct {
logger *slog.Logger
fl flagger.Flagger
telemetryStore telemetrystore.TelemetryStore
metadataStore telemetrytypes.MetadataStore
promEngine prometheus.Prometheus
@@ -64,12 +62,10 @@ func New(
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
bucketCache BucketCache,
flagger flagger.Flagger,
) *querier {
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
return &querier{
logger: querierSettings.Logger(),
fl: flagger,
telemetryStore: telemetryStore,
metadataStore: metadataStore,
promEngine: promEngine,
@@ -688,7 +684,7 @@ func (q *querier) run(
}
gomaps.Copy(results, preseededResults)
processedResults, err := q.postProcessResults(ctx, orgID, results, req)
processedResults, err := q.postProcessResults(ctx, results, req)
if err != nil {
return nil, err
}

View File

@@ -7,7 +7,6 @@ import (
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
@@ -45,15 +44,14 @@ func TestQueryRange_MetricTypeMissing(t *testing.T) {
providerSettings,
nil, // telemetryStore
metadataStore,
nil, // prometheus
nil, // traceStmtBuilder
nil, // logStmtBuilder
nil, // auditStmtBuilder
nil, // metricStmtBuilder
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
flaggertest.New(t), // flagger
nil, // prometheus
nil, // traceStmtBuilder
nil, // logStmtBuilder
nil, // auditStmtBuilder
nil, // metricStmtBuilder
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
)
req := &qbtypes.QueryRangeRequest{
@@ -118,7 +116,6 @@ func TestQueryRange_MetricTypeFromStore(t *testing.T) {
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
flaggertest.New(t), // flagger
)
req := &qbtypes.QueryRangeRequest{

View File

@@ -186,6 +186,5 @@ func newProvider(
meterStmtBuilder,
traceOperatorStmtBuilder,
bucketCache,
flagger,
), nil
}

View File

@@ -53,7 +53,6 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
flagger,
), metadataStore
}
@@ -103,7 +102,6 @@ func prepareQuerierForLogs(t *testing.T, telemetryStore telemetrystore.Telemetry
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
fl,
)
}
@@ -148,6 +146,5 @@ func prepareQuerierForTraces(t *testing.T, telemetryStore telemetrystore.Telemet
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
fl,
)
}

View File

@@ -21,7 +21,7 @@ from fixtures.querier import (
def _get_bodies(response: requests.Response) -> list[dict[str, Any]]:
return [row["data"]["body"] for row in get_rows(response)]
return [json.loads(row["data"]["body"]) for row in get_rows(response)]
def _run_query_case(signoz: types.SigNoz, token: str, now: datetime, case: dict[str, Any]) -> None:
@@ -1188,7 +1188,7 @@ def test_message_searches(
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
def _body_messages(response: requests.Response) -> list[str]:
return [row["data"]["body"].get("message", "") for row in get_rows(response)]
return [json.loads(row["data"]["body"]).get("message", "") for row in get_rows(response)]
payment_messages = {
"Payment processed successfully",