Compare commits

..

14 Commits

Author SHA1 Message Date
Prashant Shahi
3f7adeb040 Merge branch 'develop' into release/v0.58.x 2024-11-13 12:01:54 +05:30
Srikanth Chekuri
323da3494b chore: add experimental rate/increase calc (#6432) 2024-11-13 11:47:56 +05:30
Vikrant Gupta
01fda51959 chore: return proper http codes on unique constraint error (#6428) 2024-11-13 00:25:00 +05:30
Srikanth Chekuri
85ac21f253 fix: update request payload for span metrics queries (#6323) 2024-11-12 17:22:42 +00:00
Srikanth Chekuri
fd9e9f0fb3 chore: add k8s {deployment, daemonset, statefulset, job} resources (#6401) 2024-11-12 15:23:40 +00:00
Nityananda Gohain
d5523fc092 fix: ignore ts for panel type table (#6419) 2024-11-12 08:04:45 +00:00
Nityananda Gohain
2ec641b99e fix: add severity_text legend (#6415) 2024-11-12 05:54:22 +00:00
Ekansh Gupta
d1503f1418 feat: fixProducerAPI (#6422)
chore: bugfix
2024-11-12 05:30:36 +00:00
Prashant Shahi
48f3b9cacb chore(signoz): pin versions: SigNoz 0.58.0, SigNoz OtelCollector 0.111.8
Signed-off-by: Prashant Shahi <prashant@signoz.io>
2024-11-08 16:02:59 +05:30
Prashant Shahi
eaf8571fe9 Merge branch 'main' into release/v0.58.x 2024-11-08 15:49:41 +05:30
Prashant Shahi
b10c22223b Merge pull request #6300 from SigNoz/release/signoz-0.57.0
Release/signoz 0.57.0
2024-10-29 00:45:14 +05:30
Prashant Shahi
cdde369748 Merge branch 'develop' into release/signoz-0.57.0 2024-10-28 21:26:47 +05:30
Prashant Shahi
523cbcd6fc chore: go mod tidy
Signed-off-by: Prashant Shahi <prashant@signoz.io>
2024-10-28 19:40:23 +05:30
Prashant Shahi
eeadc021e1 chore(signoz): pin versions: SigNoz 0.57.0
Signed-off-by: Prashant Shahi <prashant@signoz.io>
2024-10-28 19:40:10 +05:30
27 changed files with 2492 additions and 35 deletions

View File

@@ -146,7 +146,7 @@ services:
condition: on-failure
query-service:
image: signoz/query-service:0.56.0
image: signoz/query-service:0.58.0
command:
[
"-config=/root/config/prometheus.yml",
@@ -186,7 +186,7 @@ services:
<<: *db-depend
frontend:
image: signoz/frontend:0.56.0
image: signoz/frontend:0.58.0
deploy:
restart_policy:
condition: on-failure
@@ -199,7 +199,7 @@ services:
- ../common/nginx-config.conf:/etc/nginx/conf.d/default.conf
otel-collector:
image: signoz/signoz-otel-collector:0.111.5
image: signoz/signoz-otel-collector:0.111.8
command:
[
"--config=/etc/otel-collector-config.yaml",
@@ -237,7 +237,7 @@ services:
- query-service
otel-collector-migrator:
image: signoz/signoz-schema-migrator:0.111.5
image: signoz/signoz-schema-migrator:0.111.8
deploy:
restart_policy:
condition: on-failure

View File

@@ -69,7 +69,7 @@ services:
- --storage.path=/data
otel-collector-migrator:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.111.5}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.111.8}
container_name: otel-migrator
command:
- "--dsn=tcp://clickhouse:9000"
@@ -84,7 +84,7 @@ services:
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
otel-collector:
container_name: signoz-otel-collector
image: signoz/signoz-otel-collector:0.111.5
image: signoz/signoz-otel-collector:0.111.8
command:
[
"--config=/etc/otel-collector-config.yaml",

View File

@@ -162,7 +162,7 @@ services:
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
query-service:
image: signoz/query-service:${DOCKER_TAG:-0.56.0}
image: signoz/query-service:${DOCKER_TAG:-0.58.0}
container_name: signoz-query-service
command:
[
@@ -201,7 +201,7 @@ services:
<<: *db-depend
frontend:
image: signoz/frontend:${DOCKER_TAG:-0.56.0}
image: signoz/frontend:${DOCKER_TAG:-0.58.0}
container_name: signoz-frontend
restart: on-failure
depends_on:
@@ -213,7 +213,7 @@ services:
- ../common/nginx-config.conf:/etc/nginx/conf.d/default.conf
otel-collector-migrator-sync:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.111.5}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.111.8}
container_name: otel-migrator-sync
command:
- "sync"
@@ -228,7 +228,7 @@ services:
# condition: service_healthy
otel-collector-migrator-async:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.111.5}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.111.8}
container_name: otel-migrator-async
command:
- "async"
@@ -245,7 +245,7 @@ services:
# condition: service_healthy
otel-collector:
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.111.5}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.111.8}
container_name: signoz-otel-collector
command:
[

View File

@@ -167,7 +167,7 @@ services:
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
query-service:
image: signoz/query-service:${DOCKER_TAG:-0.56.0}
image: signoz/query-service:${DOCKER_TAG:-0.58.0}
container_name: signoz-query-service
command:
[
@@ -208,7 +208,7 @@ services:
<<: *db-depend
frontend:
image: signoz/frontend:${DOCKER_TAG:-0.56.0}
image: signoz/frontend:${DOCKER_TAG:-0.58.0}
container_name: signoz-frontend
restart: on-failure
depends_on:
@@ -220,7 +220,7 @@ services:
- ../common/nginx-config.conf:/etc/nginx/conf.d/default.conf
otel-collector-migrator:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.111.5}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.111.8}
container_name: otel-migrator
command:
- "--dsn=tcp://clickhouse:9000"
@@ -234,7 +234,7 @@ services:
otel-collector:
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.111.5}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.111.8}
container_name: signoz-otel-collector
command:
[

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"
"go.signoz.io/signoz/ee/query-service/license/sqlite"
"go.signoz.io/signoz/ee/query-service/model"
@@ -274,14 +275,14 @@ func (r *Repo) InitFeatures(req basemodel.FeatureSet) error {
}
// InsertLicenseV3 inserts a new license v3 in db
func (r *Repo) InsertLicenseV3(ctx context.Context, l *model.LicenseV3) error {
func (r *Repo) InsertLicenseV3(ctx context.Context, l *model.LicenseV3) *model.ApiError {
query := `INSERT INTO licenses_v3 (id, key, data) VALUES ($1, $2, $3)`
// licsense is the entity of zeus so putting the entire license here without defining schema
licenseData, err := json.Marshal(l.Data)
if err != nil {
return fmt.Errorf("insert license failed: license marshal error")
return &model.ApiError{Typ: basemodel.ErrorBadData, Err: err}
}
_, err = r.db.ExecContext(ctx,
@@ -292,8 +293,14 @@ func (r *Repo) InsertLicenseV3(ctx context.Context, l *model.LicenseV3) error {
)
if err != nil {
if sqliteErr, ok := err.(sqlite3.Error); ok {
if sqliteErr.ExtendedCode == sqlite3.ErrConstraintUnique {
zap.L().Error("error in inserting license data: ", zap.Error(sqliteErr))
return &model.ApiError{Typ: model.ErrorConflict, Err: sqliteErr}
}
}
zap.L().Error("error in inserting license data: ", zap.Error(err))
return fmt.Errorf("failed to insert license in db: %v", err)
return &model.ApiError{Typ: basemodel.ErrorExec, Err: err}
}
return nil

View File

@@ -463,7 +463,7 @@ func (lm *Manager) ActivateV3(ctx context.Context, licenseKey string) (licenseRe
err := lm.repo.InsertLicenseV3(ctx, license)
if err != nil {
zap.L().Error("failed to activate license", zap.Error(err))
return nil, model.InternalError(err)
return nil, err
}
// license is valid, activate it

View File

@@ -202,6 +202,7 @@ function LogsExplorerViews({
id: 'severity_text--string----true',
},
],
legend: '{{severity_text}}',
};
const modifiedQuery: Query = {

View File

@@ -58,12 +58,17 @@ export const databaseCallsRPS = ({
const legends = [legend];
const dataSource = DataSource.METRICS;
const timeAggregateOperators = [MetricAggregateOperator.RATE];
const spaceAggregateOperators = [MetricAggregateOperator.SUM];
return getQueryBuilderQueries({
autocompleteData,
groupBy,
legends,
filterItems,
dataSource,
timeAggregateOperators,
spaceAggregateOperators,
});
};

View File

@@ -213,12 +213,17 @@ export const externalCallRpsByAddress = ({
const legends = [legend];
const dataSource = DataSource.METRICS;
const timeAggregateOperators = [MetricAggregateOperator.RATE];
const spaceAggregateOperators = [MetricAggregateOperator.SUM];
return getQueryBuilderQueries({
autocompleteData,
groupBy,
legends,
filterItems,
dataSource,
timeAggregateOperators,
spaceAggregateOperators,
});
};

View File

@@ -25,6 +25,8 @@ export const getQueryBuilderQueries = ({
aggregateOperator,
dataSource,
queryNameAndExpression,
timeAggregateOperators,
spaceAggregateOperators,
}: BuilderQueriesProps): QueryBuilderData => ({
queryFormulas: [],
queryData: autocompleteData.map((item, index) => {
@@ -50,6 +52,8 @@ export const getQueryBuilderQueries = ({
op: 'AND',
},
reduceTo: 'avg',
spaceAggregation: spaceAggregateOperators[index],
timeAggregation: timeAggregateOperators[index],
dataSource,
};

View File

@@ -83,6 +83,17 @@ export const latency = ({
const dataSource = isSpanMetricEnable ? DataSource.METRICS : DataSource.TRACES;
const queryNameAndExpression = QUERYNAME_AND_EXPRESSION;
const timeAggregateOperators = [
MetricAggregateOperator.EMPTY,
MetricAggregateOperator.EMPTY,
MetricAggregateOperator.EMPTY,
];
const spaceAggregateOperators = [
MetricAggregateOperator.P50,
MetricAggregateOperator.P90,
MetricAggregateOperator.P99,
];
return getQueryBuilderQueries({
autocompleteData,
legends,
@@ -90,6 +101,8 @@ export const latency = ({
aggregateOperator,
dataSource,
queryNameAndExpression,
timeAggregateOperators,
spaceAggregateOperators,
});
};
@@ -510,11 +523,16 @@ export const operationPerSec = ({
const legends = OPERATION_LEGENDS;
const dataSource = DataSource.METRICS;
const timeAggregateOperators = [MetricAggregateOperator.RATE];
const spaceAggregateOperators = [MetricAggregateOperator.SUM];
return getQueryBuilderQueries({
autocompleteData,
legends,
filterItems,
dataSource,
timeAggregateOperators,
spaceAggregateOperators,
});
};

View File

@@ -29,6 +29,8 @@ export interface BuilderQueriesProps {
aggregateOperator?: string[];
dataSource: DataSource;
queryNameAndExpression?: string[];
timeAggregateOperators: MetricAggregateOperator[];
spaceAggregateOperators: MetricAggregateOperator[];
}
export interface BuilderQuerieswithFormulaProps {

View File

@@ -2,18 +2,27 @@
import { DownloadOptions } from 'container/Download/Download.types';
import { MenuItemKeys } from 'container/GridCardLayout/WidgetHeader/contants';
import {
MetricAggregateOperator,
TracesAggregatorOperator,
} from 'types/common/queryBuilder';
export const legend = {
address: '{{address}}',
};
export const QUERYNAME_AND_EXPRESSION = ['A', 'B', 'C'];
export const LATENCY_AGGREGATEOPERATOR = ['p50', 'p90', 'p99'];
export const LATENCY_AGGREGATEOPERATOR_SPAN_METRICS = [
'hist_quantile_50',
'hist_quantile_90',
'hist_quantile_99',
export const LATENCY_AGGREGATEOPERATOR = [
TracesAggregatorOperator.P50,
TracesAggregatorOperator.P90,
TracesAggregatorOperator.P99,
];
export const LATENCY_AGGREGATEOPERATOR_SPAN_METRICS = [
MetricAggregateOperator.P50,
MetricAggregateOperator.P90,
MetricAggregateOperator.P99,
];
export const OPERATION_LEGENDS = ['Operations'];
export const MENU_ITEMS = [MenuItemKeys.View, MenuItemKeys.CreateAlerts];
@@ -21,8 +30,21 @@ export const MENU_ITEMS = [MenuItemKeys.View, MenuItemKeys.CreateAlerts];
export enum FORMULA {
ERROR_PERCENTAGE = 'A*100/B',
DATABASE_CALLS_AVG_DURATION = 'A/B',
// The apdex formula is (satisfied_count + 0.5 * tolerating_count + 0 * frustating_count) / total_count
// The satisfied_count is B, tolerating_count is C, total_count is A
// But why do we have (B+C)/2 instead of B + C/2?
// The way we issue the query is latency <= threshold, which means we over count i.e
// query B => durationNano <= 500ms
// query C => durationNano <= 2000ms
// Since <= 2000ms includes <= 500ms, we over count, to correct we subtract B/2
// so the full expression would be (B + C/2) - B/2 = (B+C)/2
APDEX_TRACES = '((B + C)/2)/A',
APDEX_DELTA_SPAN_METRICS = '((B + C)/2)/A',
// Does the same not apply for delta span metrics?
// No, because the delta metrics store the counts just for the current bucket
// so we don't need to subtract anything
APDEX_DELTA_SPAN_METRICS = '(B + C)/A',
// Cumulative span metrics store the counts for all buckets
// so we need to subtract B/2 to correct the over counting
APDEX_CUMULATIVE_SPAN_METRICS = '((B + C)/2)/A',
}

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.25.0
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.111.5
github.com/SigNoz/signoz-otel-collector v0.111.8
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974
github.com/antonmedv/expr v1.15.3

4
go.sum
View File

@@ -70,8 +70,8 @@ github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkb
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
github.com/SigNoz/prometheus v1.12.0 h1:+BXeIHyMOOWWa+xjhJ+x80JFva7r1WzWIfIhQ5PUmIE=
github.com/SigNoz/prometheus v1.12.0/go.mod h1:EqNM27OwmPfqMUk+E+XG1L9rfDFcyXnzzDrg0EPOfxA=
github.com/SigNoz/signoz-otel-collector v0.111.5 h1:kLpJSv9U46doA+89nfUvTLcNb6WbIxiMAtNlTNL88ZE=
github.com/SigNoz/signoz-otel-collector v0.111.5/go.mod h1:/nyVFDiEz/QBfyqekB3zRwstZ/KSIB85qgV9NnzAtig=
github.com/SigNoz/signoz-otel-collector v0.111.8 h1:t3V3Ahue2ucryRdHvqz33zRCPGQ86xkAsx9J23ZNPk0=
github.com/SigNoz/signoz-otel-collector v0.111.8/go.mod h1:/nyVFDiEz/QBfyqekB3zRwstZ/KSIB85qgV9NnzAtig=
github.com/SigNoz/zap_otlp v0.1.0 h1:T7rRcFN87GavY8lDGZj0Z3Xv6OhJA6Pj3I9dNPmqvRc=
github.com/SigNoz/zap_otlp v0.1.0/go.mod h1:lcHvbDbRgvDnPxo9lDlaL1JK2PyOyouP/C3ynnYIvyo=
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974 h1:PKVgdf83Yw+lZJbFtNGBgqXiXNf3+kOXW2qZ7Ms7OaY=

View File

@@ -119,6 +119,11 @@ type APIHandler struct {
nodesRepo *inframetrics.NodesRepo
namespacesRepo *inframetrics.NamespacesRepo
clustersRepo *inframetrics.ClustersRepo
// workloads
deploymentsRepo *inframetrics.DeploymentsRepo
daemonsetsRepo *inframetrics.DaemonSetsRepo
statefulsetsRepo *inframetrics.StatefulSetsRepo
jobsRepo *inframetrics.JobsRepo
}
type APIHandlerOpts struct {
@@ -197,6 +202,10 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
nodesRepo := inframetrics.NewNodesRepo(opts.Reader, querierv2)
namespacesRepo := inframetrics.NewNamespacesRepo(opts.Reader, querierv2)
clustersRepo := inframetrics.NewClustersRepo(opts.Reader, querierv2)
deploymentsRepo := inframetrics.NewDeploymentsRepo(opts.Reader, querierv2)
daemonsetsRepo := inframetrics.NewDaemonSetsRepo(opts.Reader, querierv2)
statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2)
jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2)
aH := &APIHandler{
reader: opts.Reader,
@@ -222,6 +231,10 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
nodesRepo: nodesRepo,
namespacesRepo: namespacesRepo,
clustersRepo: clustersRepo,
deploymentsRepo: deploymentsRepo,
daemonsetsRepo: daemonsetsRepo,
statefulsetsRepo: statefulsetsRepo,
jobsRepo: jobsRepo,
}
logsQueryBuilder := logsv3.PrepareLogsQuery
@@ -319,6 +332,8 @@ func RespondError(w http.ResponseWriter, apiErr model.BaseApiError, data interfa
code = http.StatusUnauthorized
case model.ErrorForbidden:
code = http.StatusForbidden
case model.ErrorConflict:
code = http.StatusConflict
default:
code = http.StatusInternalServerError
}
@@ -400,6 +415,26 @@ func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMid
clustersSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getClusterAttributeKeys)).Methods(http.MethodGet)
clustersSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getClusterAttributeValues)).Methods(http.MethodGet)
clustersSubRouter.HandleFunc("/list", am.ViewAccess(aH.getClusterList)).Methods(http.MethodPost)
deploymentsSubRouter := router.PathPrefix("/api/v1/deployments").Subrouter()
deploymentsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getDeploymentAttributeKeys)).Methods(http.MethodGet)
deploymentsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getDeploymentAttributeValues)).Methods(http.MethodGet)
deploymentsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getDeploymentList)).Methods(http.MethodPost)
daemonsetsSubRouter := router.PathPrefix("/api/v1/daemonsets").Subrouter()
daemonsetsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getDaemonSetAttributeKeys)).Methods(http.MethodGet)
daemonsetsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getDaemonSetAttributeValues)).Methods(http.MethodGet)
daemonsetsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getDaemonSetList)).Methods(http.MethodPost)
statefulsetsSubRouter := router.PathPrefix("/api/v1/statefulsets").Subrouter()
statefulsetsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getStatefulSetAttributeKeys)).Methods(http.MethodGet)
statefulsetsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getStatefulSetAttributeValues)).Methods(http.MethodGet)
statefulsetsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getStatefulSetList)).Methods(http.MethodPost)
jobsSubRouter := router.PathPrefix("/api/v1/jobs").Subrouter()
jobsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getJobAttributeKeys)).Methods(http.MethodGet)
jobsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getJobAttributeValues)).Methods(http.MethodGet)
jobsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getJobList)).Methods(http.MethodPost)
}
func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) {

View File

@@ -334,3 +334,213 @@ func (aH *APIHandler) getClusterList(w http.ResponseWriter, r *http.Request) {
aH.Respond(w, clusterList)
}
func (aH *APIHandler) getDeploymentAttributeKeys(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
keys, err := aH.deploymentsRepo.GetDeploymentAttributeKeys(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) getDeploymentAttributeValues(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
values, err := aH.deploymentsRepo.GetDeploymentAttributeValues(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) getDeploymentList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := model.DeploymentListRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
deploymentList, err := aH.deploymentsRepo.GetDeploymentList(ctx, req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, deploymentList)
}
func (aH *APIHandler) getDaemonSetAttributeKeys(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
keys, err := aH.daemonsetsRepo.GetDaemonSetAttributeKeys(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) getDaemonSetAttributeValues(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
values, err := aH.daemonsetsRepo.GetDaemonSetAttributeValues(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) getDaemonSetList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := model.DaemonSetListRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
daemonSetList, err := aH.daemonsetsRepo.GetDaemonSetList(ctx, req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, daemonSetList)
}
func (aH *APIHandler) getStatefulSetAttributeKeys(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
keys, err := aH.statefulsetsRepo.GetStatefulSetAttributeKeys(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) getStatefulSetAttributeValues(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
values, err := aH.statefulsetsRepo.GetStatefulSetAttributeValues(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) getStatefulSetList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := model.StatefulSetListRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
statefulSetList, err := aH.statefulsetsRepo.GetStatefulSetList(ctx, req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, statefulSetList)
}
func (aH *APIHandler) getJobAttributeKeys(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
keys, err := aH.jobsRepo.GetJobAttributeKeys(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) getJobAttributeValues(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
values, err := aH.jobsRepo.GetJobAttributeValues(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) getJobList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := model.JobListRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
jobList, err := aH.jobsRepo.GetJobList(ctx, req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, jobList)
}

View File

@@ -73,6 +73,22 @@ func getParamsForTopClusters(req model.ClusterListRequest) (int64, string, strin
return getParamsForTopItems(req.Start, req.End)
}
func getParamsForTopDeployments(req model.DeploymentListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
func getParamsForTopDaemonSets(req model.DaemonSetListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
func getParamsForTopStatefulSets(req model.StatefulSetListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
func getParamsForTopJobs(req model.JobListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
// TODO(srikanthccv): remove this
// What is happening here?
// The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint

View File

@@ -0,0 +1,444 @@
package inframetrics
import (
"context"
"math"
"sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"golang.org/x/exp/slices"
)
var (
metricToUseForDaemonSets = "k8s_pod_cpu_utilization"
k8sDaemonSetNameAttrKey = "k8s_daemonset_name"
metricNamesForDaemonSets = map[string]string{
"desired_nodes": "k8s_daemonset_desired_scheduled_nodes",
"available_nodes": "k8s_daemonset_current_scheduled_nodes",
}
daemonSetAttrsToEnrich = []string{
"k8s_daemonset_name",
"k8s_namespace_name",
"k8s_cluster_name",
}
queryNamesForDaemonSets = map[string][]string{
"cpu": {"A"},
"cpu_request": {"B", "A"},
"cpu_limit": {"C", "A"},
"memory": {"D"},
"memory_request": {"E", "D"},
"memory_limit": {"F", "D"},
"restarts": {"G", "A"},
"desired_nodes": {"H"},
"available_nodes": {"I"},
}
builderQueriesForDaemonSets = map[string]*v3.BuilderQuery{
// desired nodes
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForDaemonSets["desired_nodes"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "H",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// available nodes
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForDaemonSets["available_nodes"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "I",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
}
daemonSetQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"}
)
type DaemonSetsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewDaemonSetsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *DaemonSetsRepo {
return &DaemonSetsRepo{reader: reader, querierV2: querierV2}
}
func (d *DaemonSetsRepo) GetDaemonSetAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
// TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForDaemonSets
if req.Limit == 0 {
req.Limit = 50
}
attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}
// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}
return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
func (d *DaemonSetsRepo) GetDaemonSetAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForDaemonSets
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
func (d *DaemonSetsRepo) getMetadataAttributes(ctx context.Context, req model.DaemonSetListRequest) (map[string]map[string]string, error) {
daemonSetAttrs := map[string]map[string]string{}
for _, key := range daemonSetAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForDaemonSets,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := d.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
daemonSetName := stringData[k8sDaemonSetNameAttrKey]
if _, ok := daemonSetAttrs[daemonSetName]; !ok {
daemonSetAttrs[daemonSetName] = map[string]string{}
}
for _, key := range req.GroupBy {
daemonSetAttrs[daemonSetName][key.Key] = stringData[key.Key]
}
}
return daemonSetAttrs, nil
}
func (d *DaemonSetsRepo) getTopDaemonSetGroups(ctx context.Context, req model.DaemonSetListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopDaemonSets(req)
queryNames := queryNamesForDaemonSets[req.OrderBy.ColumnName]
topDaemonSetGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topDaemonSetGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, topDaemonSetGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topDaemonSetGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopDaemonSetGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topDaemonSetGroups := []map[string]string{}
for _, series := range paginatedTopDaemonSetGroupsSeries {
topDaemonSetGroups = append(topDaemonSetGroups, series.Labels)
}
allDaemonSetGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allDaemonSetGroups = append(allDaemonSetGroups, series.Labels)
}
return topDaemonSetGroups, allDaemonSetGroups, nil
}
func (d *DaemonSetsRepo) GetDaemonSetList(ctx context.Context, req model.DaemonSetListRequest) (model.DaemonSetListResponse, error) {
resp := model.DaemonSetListResponse{}
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
}
if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sDaemonSetNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := WorkloadTableListQuery.Clone()
query.Start = req.Start
query.End = req.End
query.Step = step
// add additional queries for daemon sets
for _, daemonSetQuery := range builderQueriesForDaemonSets {
query.CompositeQuery.BuilderQueries[daemonSetQuery.QueryName] = daemonSetQuery
}
for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
// make sure we only get records for daemon sets
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: k8sDaemonSetNameAttrKey},
Operator: v3.FilterOperatorExists,
})
}
daemonSetAttrs, err := d.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}
topDaemonSetGroups, allDaemonSetGroups, err := d.getTopDaemonSetGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topDaemonSetGroup := range topDaemonSetGroups {
for k, v := range topDaemonSetGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
records := []model.DaemonSetListRecord{}
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.DaemonSetListRecord{
DaemonSetName: "",
CPUUsage: -1,
CPURequest: -1,
CPULimit: -1,
MemoryUsage: -1,
MemoryRequest: -1,
MemoryLimit: -1,
DesiredNodes: -1,
AvailableNodes: -1,
}
if daemonSetName, ok := row.Data[k8sDaemonSetNameAttrKey].(string); ok {
record.DaemonSetName = daemonSetName
}
if cpu, ok := row.Data["A"].(float64); ok {
record.CPUUsage = cpu
}
if cpuRequest, ok := row.Data["B"].(float64); ok {
record.CPURequest = cpuRequest
}
if cpuLimit, ok := row.Data["C"].(float64); ok {
record.CPULimit = cpuLimit
}
if memory, ok := row.Data["D"].(float64); ok {
record.MemoryUsage = memory
}
if memoryRequest, ok := row.Data["E"].(float64); ok {
record.MemoryRequest = memoryRequest
}
if memoryLimit, ok := row.Data["F"].(float64); ok {
record.MemoryLimit = memoryLimit
}
if restarts, ok := row.Data["G"].(float64); ok {
record.Restarts = int(restarts)
}
if desiredNodes, ok := row.Data["H"].(float64); ok {
record.DesiredNodes = int(desiredNodes)
}
if availableNodes, ok := row.Data["I"].(float64); ok {
record.AvailableNodes = int(availableNodes)
}
record.Meta = map[string]string{}
if _, ok := daemonSetAttrs[record.DaemonSetName]; ok {
record.Meta = daemonSetAttrs[record.DaemonSetName]
}
for k, v := range row.Data {
if slices.Contains(daemonSetQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}
records = append(records, record)
}
}
resp.Total = len(allDaemonSetGroups)
resp.Records = records
return resp, nil
}

View File

@@ -0,0 +1,444 @@
package inframetrics
import (
"context"
"math"
"sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"golang.org/x/exp/slices"
)
var (
metricToUseForDeployments = "k8s_pod_cpu_utilization"
k8sDeploymentNameAttrKey = "k8s_deployment_name"
metricNamesForDeployments = map[string]string{
"desired_pods": "k8s_deployment_desired",
"available_pods": "k8s_deployment_available",
}
deploymentAttrsToEnrich = []string{
"k8s_deployment_name",
"k8s_namespace_name",
"k8s_cluster_name",
}
queryNamesForDeployments = map[string][]string{
"cpu": {"A"},
"cpu_request": {"B", "A"},
"cpu_limit": {"C", "A"},
"memory": {"D"},
"memory_request": {"E", "D"},
"memory_limit": {"F", "D"},
"restarts": {"G", "A"},
"desired_pods": {"H"},
"available_pods": {"I"},
}
builderQueriesForDeployments = map[string]*v3.BuilderQuery{
// desired pods
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForDeployments["desired_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "H",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// available pods
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForDeployments["available_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "I",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
}
deploymentQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"}
)
type DeploymentsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewDeploymentsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *DeploymentsRepo {
return &DeploymentsRepo{reader: reader, querierV2: querierV2}
}
func (d *DeploymentsRepo) GetDeploymentAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
// TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForDeployments
if req.Limit == 0 {
req.Limit = 50
}
attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}
// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}
return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
func (d *DeploymentsRepo) GetDeploymentAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForDeployments
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
func (d *DeploymentsRepo) getMetadataAttributes(ctx context.Context, req model.DeploymentListRequest) (map[string]map[string]string, error) {
deploymentAttrs := map[string]map[string]string{}
for _, key := range deploymentAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForDeployments,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := d.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
deploymentName := stringData[k8sDeploymentNameAttrKey]
if _, ok := deploymentAttrs[deploymentName]; !ok {
deploymentAttrs[deploymentName] = map[string]string{}
}
for _, key := range req.GroupBy {
deploymentAttrs[deploymentName][key.Key] = stringData[key.Key]
}
}
return deploymentAttrs, nil
}
func (d *DeploymentsRepo) getTopDeploymentGroups(ctx context.Context, req model.DeploymentListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopDeployments(req)
queryNames := queryNamesForDeployments[req.OrderBy.ColumnName]
topDeploymentGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topDeploymentGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, topDeploymentGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topDeploymentGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopDeploymentGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topDeploymentGroups := []map[string]string{}
for _, series := range paginatedTopDeploymentGroupsSeries {
topDeploymentGroups = append(topDeploymentGroups, series.Labels)
}
allDeploymentGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allDeploymentGroups = append(allDeploymentGroups, series.Labels)
}
return topDeploymentGroups, allDeploymentGroups, nil
}
func (d *DeploymentsRepo) GetDeploymentList(ctx context.Context, req model.DeploymentListRequest) (model.DeploymentListResponse, error) {
resp := model.DeploymentListResponse{}
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
}
if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sDeploymentNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := WorkloadTableListQuery.Clone()
query.Start = req.Start
query.End = req.End
query.Step = step
// add additional queries for deployments
for _, deploymentQuery := range builderQueriesForDeployments {
query.CompositeQuery.BuilderQueries[deploymentQuery.QueryName] = deploymentQuery
}
for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
// make sure we only get records for deployments
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: k8sDeploymentNameAttrKey},
Operator: v3.FilterOperatorExists,
})
}
deploymentAttrs, err := d.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}
topDeploymentGroups, allDeploymentGroups, err := d.getTopDeploymentGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topDeploymentGroup := range topDeploymentGroups {
for k, v := range topDeploymentGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
records := []model.DeploymentListRecord{}
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.DeploymentListRecord{
DeploymentName: "",
CPUUsage: -1,
CPURequest: -1,
CPULimit: -1,
MemoryUsage: -1,
MemoryRequest: -1,
MemoryLimit: -1,
DesiredPods: -1,
AvailablePods: -1,
}
if deploymentName, ok := row.Data[k8sDeploymentNameAttrKey].(string); ok {
record.DeploymentName = deploymentName
}
if cpu, ok := row.Data["A"].(float64); ok {
record.CPUUsage = cpu
}
if cpuRequest, ok := row.Data["B"].(float64); ok {
record.CPURequest = cpuRequest
}
if cpuLimit, ok := row.Data["C"].(float64); ok {
record.CPULimit = cpuLimit
}
if memory, ok := row.Data["D"].(float64); ok {
record.MemoryUsage = memory
}
if memoryRequest, ok := row.Data["E"].(float64); ok {
record.MemoryRequest = memoryRequest
}
if memoryLimit, ok := row.Data["F"].(float64); ok {
record.MemoryLimit = memoryLimit
}
if restarts, ok := row.Data["G"].(float64); ok {
record.Restarts = int(restarts)
}
if desiredPods, ok := row.Data["H"].(float64); ok {
record.DesiredPods = int(desiredPods)
}
if availablePods, ok := row.Data["I"].(float64); ok {
record.AvailablePods = int(availablePods)
}
record.Meta = map[string]string{}
if _, ok := deploymentAttrs[record.DeploymentName]; ok {
record.Meta = deploymentAttrs[record.DeploymentName]
}
for k, v := range row.Data {
if slices.Contains(deploymentQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}
records = append(records, record)
}
}
resp.Total = len(allDeploymentGroups)
resp.Records = records
return resp, nil
}

View File

@@ -0,0 +1,498 @@
package inframetrics
import (
"context"
"math"
"sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"golang.org/x/exp/slices"
)
var (
metricToUseForJobs = "k8s_pod_cpu_utilization"
k8sJobNameAttrKey = "k8s_job_name"
metricNamesForJobs = map[string]string{
"desired_successful_pods": "k8s_job_desired_successful_pods",
"active_pods": "k8s_job_active_pods",
"failed_pods": "k8s_job_failed_pods",
"successful_pods": "k8s_job_successful_pods",
}
jobAttrsToEnrich = []string{
"k8s_job_name",
"k8s_namespace_name",
"k8s_cluster_name",
}
queryNamesForJobs = map[string][]string{
"cpu": {"A"},
"cpu_request": {"B", "A"},
"cpu_limit": {"C", "A"},
"memory": {"D"},
"memory_request": {"E", "D"},
"memory_limit": {"F", "D"},
"restarts": {"G", "A"},
"desired_pods": {"H"},
"active_pods": {"I"},
"failed_pods": {"J"},
"successful_pods": {"K"},
}
builderQueriesForJobs = map[string]*v3.BuilderQuery{
// desired nodes
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForJobs["desired_successful_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "H",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// available nodes
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForJobs["active_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "I",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// failed pods
"J": {
QueryName: "J",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForJobs["failed_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "J",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// successful pods
"K": {
QueryName: "K",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForJobs["successful_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "K",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
}
jobQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"}
)
type JobsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewJobsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *JobsRepo {
return &JobsRepo{reader: reader, querierV2: querierV2}
}
func (d *JobsRepo) GetJobAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
// TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForJobs
if req.Limit == 0 {
req.Limit = 50
}
attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}
// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}
return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
func (d *JobsRepo) GetJobAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForJobs
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
func (d *JobsRepo) getMetadataAttributes(ctx context.Context, req model.JobListRequest) (map[string]map[string]string, error) {
jobAttrs := map[string]map[string]string{}
for _, key := range jobAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForJobs,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := d.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
jobName := stringData[k8sJobNameAttrKey]
if _, ok := jobAttrs[jobName]; !ok {
jobAttrs[jobName] = map[string]string{}
}
for _, key := range req.GroupBy {
jobAttrs[jobName][key.Key] = stringData[key.Key]
}
}
return jobAttrs, nil
}
func (d *JobsRepo) getTopJobGroups(ctx context.Context, req model.JobListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopJobs(req)
queryNames := queryNamesForJobs[req.OrderBy.ColumnName]
topJobGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topJobGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, topJobGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topJobGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopJobGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topJobGroups := []map[string]string{}
for _, series := range paginatedTopJobGroupsSeries {
topJobGroups = append(topJobGroups, series.Labels)
}
allJobGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allJobGroups = append(allJobGroups, series.Labels)
}
return topJobGroups, allJobGroups, nil
}
func (d *JobsRepo) GetJobList(ctx context.Context, req model.JobListRequest) (model.JobListResponse, error) {
resp := model.JobListResponse{}
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "desired_pods", Order: v3.DirectionDesc}
}
if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sJobNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := WorkloadTableListQuery.Clone()
query.Start = req.Start
query.End = req.End
query.Step = step
// add additional queries for jobs
for _, jobQuery := range builderQueriesForJobs {
query.CompositeQuery.BuilderQueries[jobQuery.QueryName] = jobQuery
}
for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
// make sure we only get records for jobs
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: k8sJobNameAttrKey},
Operator: v3.FilterOperatorExists,
})
}
jobAttrs, err := d.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}
topJobGroups, allJobGroups, err := d.getTopJobGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topJobGroup := range topJobGroups {
for k, v := range topJobGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
records := []model.JobListRecord{}
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.JobListRecord{
JobName: "",
CPUUsage: -1,
CPURequest: -1,
CPULimit: -1,
MemoryUsage: -1,
MemoryRequest: -1,
MemoryLimit: -1,
DesiredSuccessfulPods: -1,
ActivePods: -1,
FailedPods: -1,
SuccessfulPods: -1,
}
if jobName, ok := row.Data[k8sJobNameAttrKey].(string); ok {
record.JobName = jobName
}
if cpu, ok := row.Data["A"].(float64); ok {
record.CPUUsage = cpu
}
if cpuRequest, ok := row.Data["B"].(float64); ok {
record.CPURequest = cpuRequest
}
if cpuLimit, ok := row.Data["C"].(float64); ok {
record.CPULimit = cpuLimit
}
if memory, ok := row.Data["D"].(float64); ok {
record.MemoryUsage = memory
}
if memoryRequest, ok := row.Data["E"].(float64); ok {
record.MemoryRequest = memoryRequest
}
if memoryLimit, ok := row.Data["F"].(float64); ok {
record.MemoryLimit = memoryLimit
}
if restarts, ok := row.Data["G"].(float64); ok {
record.Restarts = int(restarts)
}
if desiredSuccessfulPods, ok := row.Data["H"].(float64); ok {
record.DesiredSuccessfulPods = int(desiredSuccessfulPods)
}
if activePods, ok := row.Data["I"].(float64); ok {
record.ActivePods = int(activePods)
}
if failedPods, ok := row.Data["J"].(float64); ok {
record.FailedPods = int(failedPods)
}
if successfulPods, ok := row.Data["K"].(float64); ok {
record.SuccessfulPods = int(successfulPods)
}
record.Meta = map[string]string{}
if _, ok := jobAttrs[record.JobName]; ok {
record.Meta = jobAttrs[record.JobName]
}
for k, v := range row.Data {
if slices.Contains(jobQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}
records = append(records, record)
}
}
resp.Total = len(allJobGroups)
resp.Records = records
return resp, nil
}

View File

@@ -0,0 +1,444 @@
package inframetrics
import (
"context"
"math"
"sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"golang.org/x/exp/slices"
)
var (
metricToUseForStatefulSets = "k8s_pod_cpu_utilization"
k8sStatefulSetNameAttrKey = "k8s_statefulset_name"
metricNamesForStatefulSets = map[string]string{
"desired_pods": "k8s_statefulset_desired_pods",
"available_pods": "k8s_statefulset_current_pods",
}
statefulSetAttrsToEnrich = []string{
"k8s_statefulset_name",
"k8s_namespace_name",
"k8s_cluster_name",
}
queryNamesForStatefulSets = map[string][]string{
"cpu": {"A"},
"cpu_request": {"B", "A"},
"cpu_limit": {"C", "A"},
"memory": {"D"},
"memory_request": {"E", "D"},
"memory_limit": {"F", "D"},
"restarts": {"G", "A"},
"desired_pods": {"H"},
"available_pods": {"I"},
}
builderQueriesForStatefulSets = map[string]*v3.BuilderQuery{
// desired pods
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForStatefulSets["desired_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "H",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// available pods
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForStatefulSets["available_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "I",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
}
statefulSetQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"}
)
type StatefulSetsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewStatefulSetsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *StatefulSetsRepo {
return &StatefulSetsRepo{reader: reader, querierV2: querierV2}
}
func (d *StatefulSetsRepo) GetStatefulSetAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
// TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForStatefulSets
if req.Limit == 0 {
req.Limit = 50
}
attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}
// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}
return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
func (d *StatefulSetsRepo) GetStatefulSetAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForStatefulSets
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
func (d *StatefulSetsRepo) getMetadataAttributes(ctx context.Context, req model.StatefulSetListRequest) (map[string]map[string]string, error) {
statefulSetAttrs := map[string]map[string]string{}
for _, key := range statefulSetAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForStatefulSets,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := d.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
statefulSetName := stringData[k8sStatefulSetNameAttrKey]
if _, ok := statefulSetAttrs[statefulSetName]; !ok {
statefulSetAttrs[statefulSetName] = map[string]string{}
}
for _, key := range req.GroupBy {
statefulSetAttrs[statefulSetName][key.Key] = stringData[key.Key]
}
}
return statefulSetAttrs, nil
}
func (d *StatefulSetsRepo) getTopStatefulSetGroups(ctx context.Context, req model.StatefulSetListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopStatefulSets(req)
queryNames := queryNamesForStatefulSets[req.OrderBy.ColumnName]
topStatefulSetGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topStatefulSetGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, topStatefulSetGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topStatefulSetGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopStatefulSetGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topStatefulSetGroups := []map[string]string{}
for _, series := range paginatedTopStatefulSetGroupsSeries {
topStatefulSetGroups = append(topStatefulSetGroups, series.Labels)
}
allStatefulSetGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allStatefulSetGroups = append(allStatefulSetGroups, series.Labels)
}
return topStatefulSetGroups, allStatefulSetGroups, nil
}
func (d *StatefulSetsRepo) GetStatefulSetList(ctx context.Context, req model.StatefulSetListRequest) (model.StatefulSetListResponse, error) {
resp := model.StatefulSetListResponse{}
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
}
if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sStatefulSetNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := WorkloadTableListQuery.Clone()
query.Start = req.Start
query.End = req.End
query.Step = step
// add additional queries for stateful sets
for _, statefulSetQuery := range builderQueriesForStatefulSets {
query.CompositeQuery.BuilderQueries[statefulSetQuery.QueryName] = statefulSetQuery
}
for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
// make sure we only get records for daemon sets
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: k8sStatefulSetNameAttrKey},
Operator: v3.FilterOperatorExists,
})
}
statefulSetAttrs, err := d.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}
topStatefulSetGroups, allStatefulSetGroups, err := d.getTopStatefulSetGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topStatefulSetGroup := range topStatefulSetGroups {
for k, v := range topStatefulSetGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
records := []model.StatefulSetListRecord{}
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.StatefulSetListRecord{
StatefulSetName: "",
CPUUsage: -1,
CPURequest: -1,
CPULimit: -1,
MemoryUsage: -1,
MemoryRequest: -1,
MemoryLimit: -1,
DesiredPods: -1,
AvailablePods: -1,
}
if statefulSetName, ok := row.Data[k8sStatefulSetNameAttrKey].(string); ok {
record.StatefulSetName = statefulSetName
}
if cpu, ok := row.Data["A"].(float64); ok {
record.CPUUsage = cpu
}
if cpuRequest, ok := row.Data["B"].(float64); ok {
record.CPURequest = cpuRequest
}
if cpuLimit, ok := row.Data["C"].(float64); ok {
record.CPULimit = cpuLimit
}
if memory, ok := row.Data["D"].(float64); ok {
record.MemoryUsage = memory
}
if memoryRequest, ok := row.Data["E"].(float64); ok {
record.MemoryRequest = memoryRequest
}
if memoryLimit, ok := row.Data["F"].(float64); ok {
record.MemoryLimit = memoryLimit
}
if restarts, ok := row.Data["G"].(float64); ok {
record.Restarts = int(restarts)
}
if desiredPods, ok := row.Data["H"].(float64); ok {
record.DesiredPods = int(desiredPods)
}
if availablePods, ok := row.Data["I"].(float64); ok {
record.AvailablePods = int(availablePods)
}
record.Meta = map[string]string{}
if _, ok := statefulSetAttrs[record.StatefulSetName]; ok {
record.Meta = statefulSetAttrs[record.StatefulSetName]
}
for k, v := range row.Data {
if slices.Contains(statefulSetQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}
records = append(records, record)
}
}
resp.Total = len(allStatefulSetGroups)
resp.Records = records
return resp, nil
}

View File

@@ -0,0 +1,166 @@
package inframetrics
import v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
var (
metricNamesForWorkloads = map[string]string{
"cpu": "k8s_pod_cpu_utilization",
"cpu_req": "k8s_pod_cpu_request_utilization",
"cpu_limit": "k8s_pod_cpu_limit_utilization",
"memory": "k8s_pod_memory_usage",
"memory_req": "k8s_pod_memory_request_utilization",
"memory_limit": "k8s_pod_memory_limit_utilization",
"restarts": "k8s_container_restarts",
}
)
var WorkloadTableListQuery = v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
// pod cpu utilization
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["cpu"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "A",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod cpu request utilization
"B": {
QueryName: "B",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["cpu_request"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "B",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod cpu limit utilization
"C": {
QueryName: "C",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["cpu_limit"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "C",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod memory utilization
"D": {
QueryName: "D",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["memory"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "D",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod memory request utilization
"E": {
QueryName: "E",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["memory_request"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "E",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod memory limit utilization
"F": {
QueryName: "F",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["memory_limit"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "F",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
"G": {
QueryName: "G",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["restarts"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "G",
ReduceTo: v3.ReduceToOperatorSum,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationMax,
Functions: []v3.Function{{Name: v3.FunctionNameRunningDiff}},
Disabled: false,
},
},
PanelType: v3.PanelTypeTable,
QueryType: v3.QueryTypeBuilder,
},
Version: "v4",
FormatForWeb: true,
}

View File

@@ -2,6 +2,7 @@ package cumulative
import (
"fmt"
"os"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/constants"
@@ -40,6 +41,9 @@ import (
const (
rateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)))`
increaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window)))`
experimentalRateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
experimentalIncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window))`
)
// prepareTimeAggregationSubQueryTimeSeries prepares the sub-query to be used for temporal aggregation
@@ -151,14 +155,22 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationRate:
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
rateExp := rateWithoutNegative
if _, ok := os.LookupEnv("EXPERIMENTAL_RATE_WITHOUT_NEGATIVE"); ok {
rateExp = fmt.Sprintf(experimentalRateWithoutNegative, start)
}
rateQueryTmpl :=
"SELECT %s ts, " + rateWithoutNegative +
"SELECT %s ts, " + rateExp +
" as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)"
subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery)
case v3.TimeAggregationIncrease:
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
increaseExp := increaseWithoutNegative
if _, ok := os.LookupEnv("EXPERIMENTAL_INCREASE_WITHOUT_NEGATIVE"); ok {
increaseExp = experimentalIncreaseWithoutNegative
}
rateQueryTmpl :=
"SELECT %s ts, " + increaseWithoutNegative +
"SELECT %s ts, " + increaseExp +
" as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)"
subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery)
}

View File

@@ -116,7 +116,9 @@ func expressionToQuery(
for _, tag := range qp.CompositeQuery.BuilderQueries[variable].GroupBy {
groupTags = append(groupTags, tag.Key)
}
groupTags = append(groupTags, "ts")
if qp.CompositeQuery.PanelType != v3.PanelTypeTable {
groupTags = append(groupTags, "ts")
}
if joinUsing == "" {
for _, tag := range groupTags {
joinUsing += fmt.Sprintf("%s.`%s` as `%s`, ", variable, tag, tag)

View File

@@ -498,11 +498,11 @@ var testLogsWithFormula = []struct {
},
},
},
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value + B.value as value FROM (SELECT now() as ts, attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] as `key1.1`, " +
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.value + B.value as value FROM (SELECT now() as ts, attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] as `key1.1`, " +
"toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] = true AND " +
"has(attributes_bool_key, 'key1.1') group by `key1.1` order by value DESC) as A INNER JOIN (SELECT now() as ts, attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] as `key1.1`, " +
"toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key1.2')] = true AND " +
"has(attributes_bool_key, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`",
"has(attributes_bool_key, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1`",
},
{
Name: "test formula with dot in filter and group by materialized attribute",
@@ -707,12 +707,12 @@ var testLogsWithFormulaV2 = []struct {
},
},
},
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value + B.value as value FROM (SELECT attributes_bool['key1.1'] as `key1.1`, " +
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.value + B.value as value FROM (SELECT attributes_bool['key1.1'] as `key1.1`, " +
"toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND (ts_bucket_start >= 1702977256 AND ts_bucket_start <= 1702982656) " +
"AND attributes_bool['key1.1'] = true AND mapContains(attributes_bool, 'key1.1') AND mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as A INNER JOIN (SELECT " +
"attributes_bool['key1.1'] as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) " +
"AND (ts_bucket_start >= 1702977256 AND ts_bucket_start <= 1702982656) AND attributes_bool['key1.2'] = true AND mapContains(attributes_bool, 'key1.2') AND " +
"mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`",
"mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1`",
},
{
Name: "test formula with dot in filter and group by materialized attribute",

View File

@@ -173,3 +173,125 @@ type ClusterListRecord struct {
MemoryAllocatable float64 `json:"memoryAllocatable"`
Meta map[string]string `json:"meta"`
}
type DeploymentListRequest struct {
Start int64 `json:"start"` // epoch time in ms
End int64 `json:"end"` // epoch time in ms
Filters *v3.FilterSet `json:"filters"`
GroupBy []v3.AttributeKey `json:"groupBy"`
OrderBy *v3.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type DeploymentListResponse struct {
Type ResponseType `json:"type"`
Records []DeploymentListRecord `json:"records"`
Total int `json:"total"`
}
type DeploymentListRecord struct {
DeploymentName string `json:"deploymentName"`
CPUUsage float64 `json:"cpuUsage"`
MemoryUsage float64 `json:"memoryUsage"`
DesiredPods int `json:"desiredPods"`
AvailablePods int `json:"availablePods"`
CPURequest float64 `json:"cpuRequest"`
MemoryRequest float64 `json:"memoryRequest"`
CPULimit float64 `json:"cpuLimit"`
MemoryLimit float64 `json:"memoryLimit"`
Restarts int `json:"restarts"`
Meta map[string]string `json:"meta"`
}
type DaemonSetListRequest struct {
Start int64 `json:"start"` // epoch time in ms
End int64 `json:"end"` // epoch time in ms
Filters *v3.FilterSet `json:"filters"`
GroupBy []v3.AttributeKey `json:"groupBy"`
OrderBy *v3.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type DaemonSetListResponse struct {
Type ResponseType `json:"type"`
Records []DaemonSetListRecord `json:"records"`
Total int `json:"total"`
}
type DaemonSetListRecord struct {
DaemonSetName string `json:"daemonSetName"`
CPUUsage float64 `json:"cpuUsage"`
MemoryUsage float64 `json:"memoryUsage"`
CPURequest float64 `json:"cpuRequest"`
MemoryRequest float64 `json:"memoryRequest"`
CPULimit float64 `json:"cpuLimit"`
MemoryLimit float64 `json:"memoryLimit"`
Restarts int `json:"restarts"`
DesiredNodes int `json:"desiredNodes"`
AvailableNodes int `json:"availableNodes"`
Meta map[string]string `json:"meta"`
}
type StatefulSetListRequest struct {
Start int64 `json:"start"` // epoch time in ms
End int64 `json:"end"` // epoch time in ms
Filters *v3.FilterSet `json:"filters"`
GroupBy []v3.AttributeKey `json:"groupBy"`
OrderBy *v3.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type StatefulSetListResponse struct {
Type ResponseType `json:"type"`
Records []StatefulSetListRecord `json:"records"`
Total int `json:"total"`
}
type StatefulSetListRecord struct {
StatefulSetName string `json:"statefulSetName"`
CPUUsage float64 `json:"cpuUsage"`
MemoryUsage float64 `json:"memoryUsage"`
CPURequest float64 `json:"cpuRequest"`
MemoryRequest float64 `json:"memoryRequest"`
CPULimit float64 `json:"cpuLimit"`
MemoryLimit float64 `json:"memoryLimit"`
Restarts int `json:"restarts"`
DesiredPods int `json:"desiredPods"`
AvailablePods int `json:"availablePods"`
Meta map[string]string `json:"meta"`
}
type JobListRequest struct {
Start int64 `json:"start"` // epoch time in ms
End int64 `json:"end"` // epoch time in ms
Filters *v3.FilterSet `json:"filters"`
GroupBy []v3.AttributeKey `json:"groupBy"`
OrderBy *v3.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type JobListResponse struct {
Type ResponseType `json:"type"`
Records []JobListRecord `json:"records"`
Total int `json:"total"`
}
type JobListRecord struct {
JobName string `json:"jobName"`
CPUUsage float64 `json:"cpuUsage"`
MemoryUsage float64 `json:"memoryUsage"`
CPURequest float64 `json:"cpuRequest"`
MemoryRequest float64 `json:"memoryRequest"`
CPULimit float64 `json:"cpuLimit"`
MemoryLimit float64 `json:"memoryLimit"`
Restarts int `json:"restarts"`
DesiredSuccessfulPods int `json:"desiredSuccessfulPods"`
ActivePods int `json:"activePods"`
FailedPods int `json:"failedPods"`
SuccessfulPods int `json:"successfulPods"`
Meta map[string]string `json:"meta"`
}