mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-20 08:50:29 +01:00
Compare commits
1 Commits
postproces
...
infraM/imp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c7ff617904 |
2
go.mod
2
go.mod
@@ -11,7 +11,6 @@ require (
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.3
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/antonmedv/expr v1.15.3
|
||||
github.com/bytedance/sonic v1.14.1
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/coreos/go-oidc/v3 v3.17.0
|
||||
github.com/dgraph-io/ristretto/v2 v2.3.0
|
||||
@@ -113,6 +112,7 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect
|
||||
github.com/aws/smithy-go v1.24.2 // indirect
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic v1.14.1 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 // indirect
|
||||
|
||||
@@ -231,7 +231,7 @@ func (m *module) ListPods(ctx context.Context, orgID valuer.UUID, req *inframoni
|
||||
return nil, err
|
||||
}
|
||||
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req, pageGroups)
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -314,19 +314,12 @@ func (m *module) ListNodes(ctx context.Context, orgID valuer.UUID, req *inframon
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeConditionCounts, err := m.getPerGroupNodeConditionCounts(ctx, req, pageGroups)
|
||||
nodeConditionCounts, err := m.getPerGroupNodeConditionCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
|
||||
// Start/End/Filter/GroupBy from PostablePods.
|
||||
podPhaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
podPhaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -409,14 +402,7 @@ func (m *module) ListNamespaces(ctx context.Context, orgID valuer.UUID, req *inf
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
|
||||
// Start/End/Filter/GroupBy from PostablePods.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -498,27 +484,14 @@ func (m *module) ListClusters(ctx context.Context, orgID valuer.UUID, req *infra
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse the nodes condition-counts CTE function via a temp struct — it reads only
|
||||
// Start/End/Filter/GroupBy from PostableNodes. With default groupBy
|
||||
// [k8s.cluster.name], counts are bucketed per cluster; with a custom groupBy,
|
||||
// they aggregate across clusters in that group.
|
||||
nodeConditionCountsMap, err := m.getPerGroupNodeConditionCounts(ctx, &inframonitoringtypes.PostableNodes{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
// With default groupBy [k8s.cluster.name], counts are bucketed per cluster;
|
||||
// with a custom groupBy, they aggregate across clusters in that group.
|
||||
nodeConditionCountsMap, err := m.getPerGroupNodeConditionCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Same pattern for pod phase counts via PostablePods shim.
|
||||
podPhaseCountsMap, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
podPhaseCountsMap, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -689,14 +662,7 @@ func (m *module) ListDeployments(ctx context.Context, orgID valuer.UUID, req *in
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
|
||||
// Start/End/Filter/GroupBy from PostablePods.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -784,16 +750,9 @@ func (m *module) ListStatefulSets(ctx context.Context, orgID valuer.UUID, req *i
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
|
||||
// Start/End/Filter/GroupBy from PostablePods. Pods owned by a StatefulSet carry
|
||||
// k8s.statefulset.name as a resource attribute, so default-groupBy gives
|
||||
// per-statefulset phase counts automatically.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
// Pods owned by a StatefulSet carry k8s.statefulset.name as a resource attribute,
|
||||
// so default-groupBy gives per-statefulset phase counts automatically.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -881,16 +840,9 @@ func (m *module) ListJobs(ctx context.Context, orgID valuer.UUID, req *inframoni
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
|
||||
// Start/End/Filter/GroupBy from PostablePods. Pods owned by a Job carry
|
||||
// k8s.job.name as a resource attribute, so default-groupBy gives
|
||||
// per-job phase counts automatically.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
// Pods owned by a Job carry k8s.job.name as a resource attribute, so default-groupBy
|
||||
// gives per-job phase counts automatically.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -978,16 +930,9 @@ func (m *module) ListDaemonSets(ctx context.Context, orgID valuer.UUID, req *inf
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse the pods phase-counts CTE function via a temp struct — it reads only
|
||||
// Start/End/Filter/GroupBy from PostablePods. Pods owned by a DaemonSet carry
|
||||
// k8s.daemonset.name as a resource attribute, so default-groupBy gives
|
||||
// per-daemonset phase counts automatically.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, &inframonitoringtypes.PostablePods{
|
||||
Start: req.Start,
|
||||
End: req.End,
|
||||
Filter: req.Filter,
|
||||
GroupBy: req.GroupBy,
|
||||
}, pageGroups)
|
||||
// Pods owned by a DaemonSet carry k8s.daemonset.name as a resource attribute,
|
||||
// so default-groupBy gives per-daemonset phase counts automatically.
|
||||
phaseCounts, err := m.getPerGroupPodPhaseCounts(ctx, req.Start, req.End, req.Filter, req.GroupBy, pageGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -170,27 +170,29 @@ func (m *module) getNodesTableMetadata(ctx context.Context, req *inframonitoring
|
||||
// Groups absent from the result map have implicit zero counts (caller default).
|
||||
func (m *module) getPerGroupNodeConditionCounts(
|
||||
ctx context.Context,
|
||||
req *inframonitoringtypes.PostableNodes,
|
||||
start, end int64,
|
||||
filter *qbtypes.Filter,
|
||||
groupBy []qbtypes.GroupByKey,
|
||||
pageGroups []map[string]string,
|
||||
) (map[string]nodeConditionCounts, error) {
|
||||
if len(pageGroups) == 0 || len(req.GroupBy) == 0 {
|
||||
if len(pageGroups) == 0 || len(groupBy) == 0 {
|
||||
return map[string]nodeConditionCounts{}, nil
|
||||
}
|
||||
|
||||
// Merged filter expression (user filter + page-groups IN clauses).
|
||||
reqFilterExpr := ""
|
||||
if req.Filter != nil {
|
||||
reqFilterExpr = req.Filter.Expression
|
||||
// Merge user filter with page-groups IN clauses.
|
||||
userFilterExpr := ""
|
||||
if filter != nil {
|
||||
userFilterExpr = filter.Expression
|
||||
}
|
||||
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
|
||||
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
|
||||
mergedFilterExpr := mergeFilterExpressions(userFilterExpr, pageGroupsFilterExpr)
|
||||
|
||||
// Resolve tables. Same convention as pods.
|
||||
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(req.Start), uint64(req.End), nil,
|
||||
uint64(start), uint64(end), nil,
|
||||
)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
|
||||
uint64(req.Start), uint64(req.End),
|
||||
uint64(start), uint64(end),
|
||||
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
|
||||
)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
|
||||
@@ -201,7 +203,7 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
"fingerprint",
|
||||
fmt.Sprintf("JSONExtractString(labels, %s) AS node_name", timeSeriesFPs.Var(nodeNameAttrKey)),
|
||||
}
|
||||
for _, key := range req.GroupBy {
|
||||
for _, key := range groupBy {
|
||||
timeSeriesFPsSelectCols = append(timeSeriesFPsSelectCols,
|
||||
fmt.Sprintf("JSONExtractString(labels, %s) AS %s", timeSeriesFPs.Var(key.Name), quoteIdentifier(key.Name)),
|
||||
)
|
||||
@@ -213,8 +215,8 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
timeSeriesFPs.GE("unix_milli", adjustedStart),
|
||||
timeSeriesFPs.L("unix_milli", adjustedEnd),
|
||||
)
|
||||
if filterExpr != "" {
|
||||
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: filterExpr}, req.Start, req.End)
|
||||
if mergedFilterExpr != "" {
|
||||
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: mergedFilterExpr}, start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -223,7 +225,7 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
}
|
||||
}
|
||||
timeSeriesFPsGroupBy := []string{"fingerprint", "node_name"}
|
||||
for _, key := range req.GroupBy {
|
||||
for _, key := range groupBy {
|
||||
timeSeriesFPsGroupBy = append(timeSeriesFPsGroupBy, quoteIdentifier(key.Name))
|
||||
}
|
||||
timeSeriesFPs.GroupBy(timeSeriesFPsGroupBy...)
|
||||
@@ -233,7 +235,7 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
latestConditionPerNode := sqlbuilder.NewSelectBuilder()
|
||||
latestConditionPerNodeSelectCols := []string{"tsfp.node_name AS node_name"}
|
||||
latestConditionPerNodeGroupBy := []string{"node_name"}
|
||||
for _, key := range req.GroupBy {
|
||||
for _, key := range groupBy {
|
||||
col := quoteIdentifier(key.Name)
|
||||
latestConditionPerNodeSelectCols = append(latestConditionPerNodeSelectCols, fmt.Sprintf("tsfp.%s AS %s", col, col))
|
||||
latestConditionPerNodeGroupBy = append(latestConditionPerNodeGroupBy, col)
|
||||
@@ -248,17 +250,17 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
))
|
||||
latestConditionPerNode.Where(
|
||||
latestConditionPerNode.E("samples.metric_name", nodeConditionMetricName),
|
||||
latestConditionPerNode.GE("samples.unix_milli", req.Start),
|
||||
latestConditionPerNode.L("samples.unix_milli", req.End),
|
||||
latestConditionPerNode.GE("samples.unix_milli", start),
|
||||
latestConditionPerNode.L("samples.unix_milli", end),
|
||||
"tsfp.node_name != ''",
|
||||
)
|
||||
latestConditionPerNode.GroupBy(latestConditionPerNodeGroupBy...)
|
||||
latestConditionPerNodeSQL, latestConditionPerNodeArgs := latestConditionPerNode.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
// ----- countNodesPerCondition (outer SELECT) -----
|
||||
countNodesPerConditionSelectCols := make([]string, 0, len(req.GroupBy)+2)
|
||||
countNodesPerConditionGroupBy := make([]string, 0, len(req.GroupBy))
|
||||
for _, key := range req.GroupBy {
|
||||
countNodesPerConditionSelectCols := make([]string, 0, len(groupBy)+2)
|
||||
countNodesPerConditionGroupBy := make([]string, 0, len(groupBy))
|
||||
for _, key := range groupBy {
|
||||
col := quoteIdentifier(key.Name)
|
||||
countNodesPerConditionSelectCols = append(countNodesPerConditionSelectCols, col)
|
||||
countNodesPerConditionGroupBy = append(countNodesPerConditionGroupBy, col)
|
||||
@@ -289,8 +291,8 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
|
||||
result := make(map[string]nodeConditionCounts)
|
||||
for rows.Next() {
|
||||
groupVals := make([]string, len(req.GroupBy))
|
||||
scanPtrs := make([]any, 0, len(req.GroupBy)+2)
|
||||
groupVals := make([]string, len(groupBy))
|
||||
scanPtrs := make([]any, 0, len(groupBy)+2)
|
||||
for i := range groupVals {
|
||||
scanPtrs = append(scanPtrs, &groupVals[i])
|
||||
}
|
||||
|
||||
@@ -189,27 +189,29 @@ func (m *module) getPodsTableMetadata(ctx context.Context, req *inframonitoringt
|
||||
// Groups absent from the result map have implicit zero counts (caller default).
|
||||
func (m *module) getPerGroupPodPhaseCounts(
|
||||
ctx context.Context,
|
||||
req *inframonitoringtypes.PostablePods,
|
||||
start, end int64,
|
||||
filter *qbtypes.Filter,
|
||||
groupBy []qbtypes.GroupByKey,
|
||||
pageGroups []map[string]string,
|
||||
) (map[string]podPhaseCounts, error) {
|
||||
if len(pageGroups) == 0 || len(req.GroupBy) == 0 {
|
||||
if len(pageGroups) == 0 || len(groupBy) == 0 {
|
||||
return map[string]podPhaseCounts{}, nil
|
||||
}
|
||||
|
||||
// Merged filter expression (user filter + page-groups IN clauses).
|
||||
reqFilterExpr := ""
|
||||
if req.Filter != nil {
|
||||
reqFilterExpr = req.Filter.Expression
|
||||
// Merge user filter with page-groups IN clauses.
|
||||
userFilterExpr := ""
|
||||
if filter != nil {
|
||||
userFilterExpr = filter.Expression
|
||||
}
|
||||
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
|
||||
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
|
||||
mergedFilterExpr := mergeFilterExpressions(userFilterExpr, pageGroupsFilterExpr)
|
||||
|
||||
// Resolve tables. Same convention as hosts (distributed names from helpers).
|
||||
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(req.Start), uint64(req.End), nil,
|
||||
uint64(start), uint64(end), nil,
|
||||
)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
|
||||
uint64(req.Start), uint64(req.End),
|
||||
uint64(start), uint64(end),
|
||||
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
|
||||
)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
|
||||
@@ -220,7 +222,7 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
"fingerprint",
|
||||
fmt.Sprintf("JSONExtractString(labels, %s) AS pod_uid", timeSeriesFPs.Var(podUIDAttrKey)),
|
||||
}
|
||||
for _, key := range req.GroupBy {
|
||||
for _, key := range groupBy {
|
||||
timeSeriesFPsSelectCols = append(timeSeriesFPsSelectCols,
|
||||
fmt.Sprintf("JSONExtractString(labels, %s) AS %s", timeSeriesFPs.Var(key.Name), quoteIdentifier(key.Name)),
|
||||
)
|
||||
@@ -232,8 +234,8 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
timeSeriesFPs.GE("unix_milli", adjustedStart),
|
||||
timeSeriesFPs.L("unix_milli", adjustedEnd),
|
||||
)
|
||||
if filterExpr != "" {
|
||||
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: filterExpr}, req.Start, req.End)
|
||||
if mergedFilterExpr != "" {
|
||||
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: mergedFilterExpr}, start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -242,7 +244,7 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
}
|
||||
}
|
||||
timeSeriesFPsGroupBy := []string{"fingerprint", "pod_uid"}
|
||||
for _, key := range req.GroupBy {
|
||||
for _, key := range groupBy {
|
||||
timeSeriesFPsGroupBy = append(timeSeriesFPsGroupBy, quoteIdentifier(key.Name))
|
||||
}
|
||||
timeSeriesFPs.GroupBy(timeSeriesFPsGroupBy...)
|
||||
@@ -251,7 +253,7 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
latestPhasePerPod := sqlbuilder.NewSelectBuilder()
|
||||
latestPhasePerPodSelectCols := []string{"tsfp.pod_uid AS pod_uid"}
|
||||
latestPhasePerPodGroupBy := []string{"pod_uid"}
|
||||
for _, key := range req.GroupBy {
|
||||
for _, key := range groupBy {
|
||||
col := quoteIdentifier(key.Name)
|
||||
latestPhasePerPodSelectCols = append(latestPhasePerPodSelectCols, fmt.Sprintf("tsfp.%s AS %s", col, col))
|
||||
latestPhasePerPodGroupBy = append(latestPhasePerPodGroupBy, col)
|
||||
@@ -266,17 +268,17 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
))
|
||||
latestPhasePerPod.Where(
|
||||
latestPhasePerPod.E("samples.metric_name", podPhaseMetricName),
|
||||
latestPhasePerPod.GE("samples.unix_milli", req.Start),
|
||||
latestPhasePerPod.L("samples.unix_milli", req.End),
|
||||
latestPhasePerPod.GE("samples.unix_milli", start),
|
||||
latestPhasePerPod.L("samples.unix_milli", end),
|
||||
"tsfp.pod_uid != ''",
|
||||
)
|
||||
latestPhasePerPod.GroupBy(latestPhasePerPodGroupBy...)
|
||||
latestPhasePerPodSQL, latestPhasePerPodArgs := latestPhasePerPod.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
// ----- countPodsPerPhase (outer SELECT) -----
|
||||
countPodsPerPhaseSelectCols := make([]string, 0, len(req.GroupBy)+5)
|
||||
countPodsPerPhaseGroupBy := make([]string, 0, len(req.GroupBy))
|
||||
for _, key := range req.GroupBy {
|
||||
countPodsPerPhaseSelectCols := make([]string, 0, len(groupBy)+5)
|
||||
countPodsPerPhaseGroupBy := make([]string, 0, len(groupBy))
|
||||
for _, key := range groupBy {
|
||||
col := quoteIdentifier(key.Name)
|
||||
countPodsPerPhaseSelectCols = append(countPodsPerPhaseSelectCols, col)
|
||||
countPodsPerPhaseGroupBy = append(countPodsPerPhaseGroupBy, col)
|
||||
@@ -310,8 +312,8 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
|
||||
result := make(map[string]podPhaseCounts)
|
||||
for rows.Next() {
|
||||
groupVals := make([]string, len(req.GroupBy))
|
||||
scanPtrs := make([]any, 0, len(req.GroupBy)+5)
|
||||
groupVals := make([]string, len(groupBy))
|
||||
scanPtrs := make([]any, 0, len(groupBy)+5)
|
||||
for i := range groupVals {
|
||||
scanPtrs = append(scanPtrs, &groupVals[i])
|
||||
}
|
||||
|
||||
@@ -12,10 +12,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/bytedance/sonic"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -24,8 +22,6 @@ var (
|
||||
// written clickhouse query. The column alias indcate which value is
|
||||
// to be considered as final result (or target).
|
||||
legacyReservedColumnTargetAliases = []string{"__result", "__value", "result", "res", "value"}
|
||||
|
||||
CodeFailUnmarshalJSONColumn = errors.MustNewCode("fail_unmarshal_json_column")
|
||||
)
|
||||
|
||||
// consume reads every row and shapes it into the payload expected for the
|
||||
@@ -397,16 +393,11 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
|
||||
|
||||
// de-reference the typed pointer to any
|
||||
val := reflect.ValueOf(cellPtr).Elem().Interface()
|
||||
// Post-process JSON columns: unmarshal bytes into map[string]any
|
||||
// Post-process JSON columns: normalize into String value
|
||||
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
|
||||
switch x := val.(type) {
|
||||
case []byte:
|
||||
var m map[string]any
|
||||
err := sonic.Unmarshal(x, &m)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailUnmarshalJSONColumn, "failed to unmarshal JSON column %s", name)
|
||||
}
|
||||
val = m
|
||||
val = string(x)
|
||||
default:
|
||||
// already a structured type (map[string]any, []any, etc.)
|
||||
}
|
||||
|
||||
@@ -12,12 +12,9 @@ import (
|
||||
"github.com/SigNoz/govaluate"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/types/featuretypes"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// queryInfo holds common query properties.
|
||||
@@ -53,7 +50,7 @@ func getQueryName(spec any) string {
|
||||
return getqueryInfo(spec).Name
|
||||
}
|
||||
|
||||
func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
|
||||
// Convert results to typed format for processing
|
||||
typedResults := make(map[string]*qbtypes.Result)
|
||||
for name, result := range results {
|
||||
@@ -72,7 +69,6 @@ func (q *querier) postProcessResults(ctx context.Context, orgID valuer.UUID, res
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
if result, ok := typedResults[spec.Name]; ok {
|
||||
result = postProcessBuilderQuery(q, result, spec, req)
|
||||
result = q.postProcessLogBody(ctx, orgID, result, req)
|
||||
typedResults[spec.Name] = result
|
||||
}
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
@@ -1050,33 +1046,3 @@ func (q *querier) calculateFormulaStep(expression string, req *qbtypes.QueryRang
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// postProcessLogBody removes the "message" key from the body map when it is empty.
|
||||
// Only runs for raw list queries with the use_json_body feature enabled.
|
||||
func (q *querier) postProcessLogBody(ctx context.Context, orgID valuer.UUID, result *qbtypes.Result, req *qbtypes.QueryRangeRequest) *qbtypes.Result {
|
||||
if req.RequestType != qbtypes.RequestTypeRaw {
|
||||
return result
|
||||
}
|
||||
if !q.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(orgID)) {
|
||||
return result
|
||||
}
|
||||
rawData, ok := result.Value.(*qbtypes.RawData)
|
||||
if !ok {
|
||||
return result
|
||||
}
|
||||
for _, row := range rawData.Rows {
|
||||
bodyMap, ok := row.Data["body"].(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if msg, exists := bodyMap["message"]; exists {
|
||||
switch v := msg.(type) {
|
||||
case string:
|
||||
if v == "" {
|
||||
delete(bodyMap, "message")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
@@ -36,7 +35,6 @@ var (
|
||||
|
||||
type querier struct {
|
||||
logger *slog.Logger
|
||||
fl flagger.Flagger
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
promEngine prometheus.Prometheus
|
||||
@@ -64,12 +62,10 @@ func New(
|
||||
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
|
||||
bucketCache BucketCache,
|
||||
flagger flagger.Flagger,
|
||||
) *querier {
|
||||
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
|
||||
return &querier{
|
||||
logger: querierSettings.Logger(),
|
||||
fl: flagger,
|
||||
telemetryStore: telemetryStore,
|
||||
metadataStore: metadataStore,
|
||||
promEngine: promEngine,
|
||||
@@ -688,7 +684,7 @@ func (q *querier) run(
|
||||
}
|
||||
|
||||
gomaps.Copy(results, preseededResults)
|
||||
processedResults, err := q.postProcessResults(ctx, orgID, results, req)
|
||||
processedResults, err := q.postProcessResults(ctx, results, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
@@ -45,15 +44,14 @@ func TestQueryRange_MetricTypeMissing(t *testing.T) {
|
||||
providerSettings,
|
||||
nil, // telemetryStore
|
||||
metadataStore,
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
@@ -118,7 +116,6 @@ func TestQueryRange_MetricTypeFromStore(t *testing.T) {
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
|
||||
@@ -186,6 +186,5 @@ func newProvider(
|
||||
meterStmtBuilder,
|
||||
traceOperatorStmtBuilder,
|
||||
bucketCache,
|
||||
flagger,
|
||||
), nil
|
||||
}
|
||||
|
||||
@@ -53,7 +53,6 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flagger,
|
||||
), metadataStore
|
||||
}
|
||||
|
||||
@@ -103,7 +102,6 @@ func prepareQuerierForLogs(t *testing.T, telemetryStore telemetrystore.Telemetry
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -148,6 +146,5 @@ func prepareQuerierForTraces(t *testing.T, telemetryStore telemetrystore.Telemet
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ from fixtures.querier import (
|
||||
|
||||
|
||||
def _get_bodies(response: requests.Response) -> list[dict[str, Any]]:
|
||||
return [row["data"]["body"] for row in get_rows(response)]
|
||||
return [json.loads(row["data"]["body"]) for row in get_rows(response)]
|
||||
|
||||
|
||||
def _run_query_case(signoz: types.SigNoz, token: str, now: datetime, case: dict[str, Any]) -> None:
|
||||
@@ -1188,7 +1188,7 @@ def test_message_searches(
|
||||
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
|
||||
|
||||
def _body_messages(response: requests.Response) -> list[str]:
|
||||
return [row["data"]["body"].get("message", "") for row in get_rows(response)]
|
||||
return [json.loads(row["data"]["body"]).get("message", "") for row in get_rows(response)]
|
||||
|
||||
payment_messages = {
|
||||
"Payment processed successfully",
|
||||
|
||||
Reference in New Issue
Block a user