mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-03 08:33:26 +00:00
chore: metrics explorer summary v2 APIs (#9579)
This commit is contained in:
139
pkg/modules/metricsexplorer/implmetricsexplorer/handler.go
Normal file
139
pkg/modules/metricsexplorer/implmetricsexplorer/handler.go
Normal file
@@ -0,0 +1,139 @@
|
||||
package implmetricsexplorer
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/binding"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metricsexplorertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
module metricsexplorer.Module
|
||||
}
|
||||
|
||||
// NewHandler returns a metricsexplorer.Handler implementation.
|
||||
func NewHandler(m metricsexplorer.Module) metricsexplorer.Handler {
|
||||
return &handler{
|
||||
module: m,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) GetStats(rw http.ResponseWriter, req *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(req.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
var in metricsexplorertypes.StatsRequest
|
||||
if err := binding.JSON.BindBody(req.Body, &in); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
out, err := h.module.GetStats(req.Context(), orgID, &in)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, out)
|
||||
}
|
||||
|
||||
func (h *handler) GetTreemap(rw http.ResponseWriter, req *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(req.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
var in metricsexplorertypes.TreemapRequest
|
||||
if err := binding.JSON.BindBody(req.Body, &in); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
out, err := h.module.GetTreemap(req.Context(), orgID, &in)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, out)
|
||||
}
|
||||
|
||||
func (h *handler) UpdateMetricMetadata(rw http.ResponseWriter, req *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(req.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Extract metric_name from URL path
|
||||
vars := mux.Vars(req)
|
||||
metricName := vars["metric_name"]
|
||||
if metricName == "" {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "metric_name is required in URL path"))
|
||||
return
|
||||
}
|
||||
|
||||
var in metricsexplorertypes.UpdateMetricMetadataRequest
|
||||
if err := binding.JSON.BindBody(req.Body, &in); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Set metric name from URL path
|
||||
in.MetricName = metricName
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
err = h.module.UpdateMetricMetadata(req.Context(), orgID, &in)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, nil)
|
||||
}
|
||||
|
||||
func (h *handler) GetMetricMetadata(rw http.ResponseWriter, req *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(req.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
metricName := strings.TrimSpace(req.URL.Query().Get("metricName"))
|
||||
if metricName == "" {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "metricName query parameter is required"))
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
metadataMap, err := h.module.GetMetricMetadataMulti(req.Context(), orgID, []string{metricName})
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
metadata, ok := metadataMap[metricName]
|
||||
if !ok || metadata == nil {
|
||||
render.Error(rw, errors.NewNotFoundf(errors.CodeNotFound, "metadata not found for metric %q", metricName))
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, metadata)
|
||||
}
|
||||
73
pkg/modules/metricsexplorer/implmetricsexplorer/helpers.go
Normal file
73
pkg/modules/metricsexplorer/implmetricsexplorer/helpers.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package implmetricsexplorer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/metricsexplorertypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
// used for mapping the sqlColumns via orderBy
|
||||
const (
|
||||
sqlColumnTimeSeries = "timeseries"
|
||||
sqlColumnSamples = "samples"
|
||||
)
|
||||
|
||||
func generateMetricMetadataCacheKey(metricName string) string {
|
||||
return fmt.Sprintf("metrics::metadata::%s", metricName)
|
||||
}
|
||||
|
||||
func getStatsOrderByColumn(order *qbtypes.OrderBy) (string, string, error) {
|
||||
if order == nil {
|
||||
return sqlColumnTimeSeries, qbtypes.OrderDirectionDesc.StringValue(), nil
|
||||
}
|
||||
|
||||
var columnName string
|
||||
switch strings.ToLower(order.Key.Name) {
|
||||
case metricsexplorertypes.OrderByTimeSeries.StringValue():
|
||||
columnName = sqlColumnTimeSeries
|
||||
case metricsexplorertypes.OrderBySamples.StringValue():
|
||||
columnName = sqlColumnSamples
|
||||
default:
|
||||
return "", "", errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"unsupported order column %q: supported columns are %q or %q",
|
||||
order.Key.Name,
|
||||
metricsexplorertypes.OrderByTimeSeries,
|
||||
metricsexplorertypes.OrderBySamples,
|
||||
)
|
||||
}
|
||||
|
||||
// Extract direction from OrderDirection and convert to SQL format (uppercase)
|
||||
var direction qbtypes.OrderDirection
|
||||
var ok bool
|
||||
// Validate direction using OrderDirectionMap
|
||||
if direction, ok = qbtypes.OrderDirectionMap[strings.ToLower(order.Direction.StringValue())]; !ok {
|
||||
return "", "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported order direction %q, should be one of %s, %s", direction, qbtypes.OrderDirectionAsc, qbtypes.OrderDirectionDesc)
|
||||
}
|
||||
|
||||
return columnName, direction.StringValue(), nil
|
||||
}
|
||||
|
||||
func extractMissingMetricNamesInMap(metricNames []string, metricMetadataMap map[string]*metricsexplorertypes.MetricMetadata) []string {
|
||||
misses := make([]string, 0)
|
||||
for _, name := range metricNames {
|
||||
if _, ok := metricMetadataMap[name]; !ok {
|
||||
misses = append(misses, name)
|
||||
}
|
||||
}
|
||||
return misses
|
||||
}
|
||||
|
||||
// enrichStatsWithMetadata enriches metric stats with metadata from the provided metadata map.
|
||||
func enrichStatsWithMetadata(metricStats []metricsexplorertypes.Stat, metadata map[string]*metricsexplorertypes.MetricMetadata) {
|
||||
for i := range metricStats {
|
||||
if meta, ok := metadata[metricStats[i].MetricName]; ok {
|
||||
metricStats[i].Description = meta.Description
|
||||
metricStats[i].MetricType = meta.MetricType
|
||||
metricStats[i].MetricUnit = meta.MetricUnit
|
||||
}
|
||||
}
|
||||
}
|
||||
773
pkg/modules/metricsexplorer/implmetricsexplorer/module.go
Normal file
773
pkg/modules/metricsexplorer/implmetricsexplorer/module.go
Normal file
@@ -0,0 +1,773 @@
|
||||
package implmetricsexplorer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/metricsexplorertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
sqlbuilder "github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
telemetryMetadataStore telemetrytypes.MetadataStore
|
||||
fieldMapper qbtypes.FieldMapper
|
||||
condBuilder qbtypes.ConditionBuilder
|
||||
logger *slog.Logger
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
// NewModule constructs the metrics module with the provided dependencies.
|
||||
func NewModule(ts telemetrystore.TelemetryStore, telemetryMetadataStore telemetrytypes.MetadataStore, cache cache.Cache, providerSettings factory.ProviderSettings) metricsexplorer.Module {
|
||||
fieldMapper := telemetrymetrics.NewFieldMapper()
|
||||
condBuilder := telemetrymetrics.NewConditionBuilder(fieldMapper)
|
||||
return &module{
|
||||
telemetryStore: ts,
|
||||
fieldMapper: fieldMapper,
|
||||
condBuilder: condBuilder,
|
||||
logger: providerSettings.Logger,
|
||||
telemetryMetadataStore: telemetryMetadataStore,
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *module) GetStats(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.StatsRequest) (*metricsexplorertypes.StatsResponse, error) {
|
||||
if err := req.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filterWhereClause, err := m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Single query to get stats with samples, timeseries counts in required sorting order
|
||||
metricStats, total, err := m.fetchMetricsStatsWithSamples(
|
||||
ctx,
|
||||
req,
|
||||
filterWhereClause,
|
||||
false,
|
||||
req.OrderBy,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metricStats) == 0 {
|
||||
return &metricsexplorertypes.StatsResponse{
|
||||
Metrics: []metricsexplorertypes.Stat{},
|
||||
Total: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get metadata for all metrics
|
||||
metricNames := make([]string, len(metricStats))
|
||||
for i := range metricStats {
|
||||
metricNames[i] = metricStats[i].MetricName
|
||||
}
|
||||
|
||||
metadata, err := m.GetMetricMetadataMulti(ctx, orgID, metricNames)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Enrich stats with metadata
|
||||
enrichStatsWithMetadata(metricStats, metadata)
|
||||
|
||||
return &metricsexplorertypes.StatsResponse{
|
||||
Metrics: metricStats,
|
||||
Total: total,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetTreemap will return metrics treemap information once implemented.
|
||||
func (m *module) GetTreemap(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.TreemapRequest) (*metricsexplorertypes.TreemapResponse, error) {
|
||||
if err := req.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filterWhereClause, err := m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp := &metricsexplorertypes.TreemapResponse{}
|
||||
switch req.Treemap {
|
||||
case metricsexplorertypes.TreemapModeSamples:
|
||||
entries, err := m.computeSamplesTreemap(ctx, req, filterWhereClause)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp.Samples = entries
|
||||
default: // TreemapModeTimeSeries
|
||||
entries, err := m.computeTimeseriesTreemap(ctx, req, filterWhereClause)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp.TimeSeries = entries
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *module) GetMetricMetadataMulti(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]*metricsexplorertypes.MetricMetadata, error) {
|
||||
if len(metricNames) == 0 {
|
||||
return map[string]*metricsexplorertypes.MetricMetadata{}, nil
|
||||
}
|
||||
|
||||
metadata := make(map[string]*metricsexplorertypes.MetricMetadata)
|
||||
cacheHits, cacheMisses := m.fetchMetadataFromCache(ctx, orgID, metricNames)
|
||||
for name, meta := range cacheHits {
|
||||
metadata[name] = meta
|
||||
}
|
||||
|
||||
if len(cacheMisses) == 0 {
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
updatedMetadata, err := m.fetchUpdatedMetadata(ctx, orgID, cacheMisses)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for name, meta := range updatedMetadata {
|
||||
metadata[name] = meta
|
||||
}
|
||||
|
||||
remainingMisses := extractMissingMetricNamesInMap(cacheMisses, updatedMetadata)
|
||||
if len(remainingMisses) == 0 {
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
timeseriesMetadata, err := m.fetchTimeseriesMetadata(ctx, orgID, remainingMisses)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for name, meta := range timeseriesMetadata {
|
||||
metadata[name] = meta
|
||||
}
|
||||
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
func (m *module) UpdateMetricMetadata(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.UpdateMetricMetadataRequest) error {
|
||||
if req == nil {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
|
||||
}
|
||||
|
||||
if req.MetricName == "" {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "metric name is required")
|
||||
}
|
||||
|
||||
// Validate and normalize metric type and temporality
|
||||
if err := m.validateAndNormalizeMetricType(req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate labels for histogram and summary types
|
||||
if err := m.validateMetricLabels(ctx, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Insert new metadata (keeping history of all updates)
|
||||
if err := m.insertMetricsMetadata(ctx, orgID, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) fetchMetadataFromCache(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]*metricsexplorertypes.MetricMetadata, []string) {
|
||||
hits := make(map[string]*metricsexplorertypes.MetricMetadata)
|
||||
misses := make([]string, 0)
|
||||
for _, metricName := range metricNames {
|
||||
cacheKey := generateMetricMetadataCacheKey(metricName)
|
||||
var cachedMetadata metricsexplorertypes.MetricMetadata
|
||||
if err := m.cache.Get(ctx, orgID, cacheKey, &cachedMetadata); err == nil {
|
||||
hits[metricName] = &cachedMetadata
|
||||
} else {
|
||||
m.logger.WarnContext(ctx, "cache miss for metric metadata", "metric_name", metricName, "error", err)
|
||||
misses = append(misses, metricName)
|
||||
}
|
||||
}
|
||||
return hits, misses
|
||||
}
|
||||
|
||||
func (m *module) fetchUpdatedMetadata(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]*metricsexplorertypes.MetricMetadata, error) {
|
||||
if len(metricNames) == 0 {
|
||||
return map[string]*metricsexplorertypes.MetricMetadata{}, nil
|
||||
}
|
||||
|
||||
args := make([]any, len(metricNames))
|
||||
for i := range metricNames {
|
||||
args[i] = metricNames[i]
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(
|
||||
"metric_name",
|
||||
"argMax(description, created_at) AS description",
|
||||
"argMax(type, created_at) AS type",
|
||||
"argMax(unit, created_at) AS unit",
|
||||
"argMax(temporality, created_at) AS temporality",
|
||||
"argMax(is_monotonic, created_at) AS is_monotonic",
|
||||
)
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.UpdatedMetadataTableName))
|
||||
sb.Where(sb.In("metric_name", args...))
|
||||
sb.GroupBy("metric_name")
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to fetch updated metrics metadata")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make(map[string]*metricsexplorertypes.MetricMetadata)
|
||||
for rows.Next() {
|
||||
var (
|
||||
metricMetadata metricsexplorertypes.MetricMetadata
|
||||
metricName string
|
||||
)
|
||||
|
||||
if err := rows.Scan(&metricName, &metricMetadata.Description, &metricMetadata.MetricType, &metricMetadata.MetricUnit, &metricMetadata.Temporality, &metricMetadata.IsMonotonic); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan updated metrics metadata")
|
||||
}
|
||||
result[metricName] = &metricMetadata
|
||||
|
||||
cacheKey := generateMetricMetadataCacheKey(metricName)
|
||||
if err := m.cache.Set(ctx, orgID, cacheKey, &metricMetadata, 0); err != nil {
|
||||
m.logger.WarnContext(ctx, "failed to set metric metadata in cache", "metric_name", metricName, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating updated metrics metadata rows")
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *module) fetchTimeseriesMetadata(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]*metricsexplorertypes.MetricMetadata, error) {
|
||||
if len(metricNames) == 0 {
|
||||
return map[string]*metricsexplorertypes.MetricMetadata{}, nil
|
||||
}
|
||||
|
||||
args := make([]any, len(metricNames))
|
||||
for i := range metricNames {
|
||||
args[i] = metricNames[i]
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(
|
||||
"metric_name",
|
||||
"ANY_VALUE(description) AS description",
|
||||
"ANY_VALUE(type) AS metric_type",
|
||||
"ANY_VALUE(unit) AS metric_unit",
|
||||
"ANY_VALUE(temporality) AS temporality",
|
||||
"ANY_VALUE(is_monotonic) AS is_monotonic",
|
||||
)
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.TimeseriesV4TableName))
|
||||
sb.Where(sb.In("metric_name", args...))
|
||||
sb.GroupBy("metric_name")
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to fetch metrics metadata from timeseries table")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make(map[string]*metricsexplorertypes.MetricMetadata)
|
||||
for rows.Next() {
|
||||
var (
|
||||
metricMetadata metricsexplorertypes.MetricMetadata
|
||||
metricName string
|
||||
)
|
||||
|
||||
if err := rows.Scan(&metricName, &metricMetadata.Description, &metricMetadata.MetricType, &metricMetadata.MetricUnit, &metricMetadata.Temporality, &metricMetadata.IsMonotonic); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan timeseries metadata")
|
||||
}
|
||||
result[metricName] = &metricMetadata
|
||||
|
||||
cacheKey := generateMetricMetadataCacheKey(metricName)
|
||||
if err := m.cache.Set(ctx, orgID, cacheKey, &metricMetadata, 0); err != nil {
|
||||
m.logger.WarnContext(ctx, "failed to set metric metadata in cache", "metric_name", metricName, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating timeseries metadata rows")
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *module) validateAndNormalizeMetricType(req *metricsexplorertypes.UpdateMetricMetadataRequest) error {
|
||||
switch req.Type {
|
||||
case metrictypes.SumType:
|
||||
if req.Temporality.IsZero() {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "temporality is required when metric type is Sum")
|
||||
}
|
||||
if req.Temporality != metrictypes.Delta && req.Temporality != metrictypes.Cumulative {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid value for temporality")
|
||||
}
|
||||
// Special case: if Sum is not monotonic and cumulative, convert to Gauge
|
||||
if !req.IsMonotonic && req.Temporality == metrictypes.Cumulative {
|
||||
req.Type = metrictypes.GaugeType
|
||||
req.Temporality = metrictypes.Unspecified
|
||||
}
|
||||
|
||||
case metrictypes.HistogramType:
|
||||
if req.Temporality.IsZero() {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "temporality is required when metric type is Histogram")
|
||||
}
|
||||
if req.Temporality != metrictypes.Delta && req.Temporality != metrictypes.Cumulative {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid value for temporality")
|
||||
}
|
||||
|
||||
case metrictypes.ExpHistogramType:
|
||||
if req.Temporality.IsZero() {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "temporality is required when metric type is exponential histogram")
|
||||
}
|
||||
if req.Temporality != metrictypes.Delta && req.Temporality != metrictypes.Cumulative {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid value for temporality")
|
||||
}
|
||||
|
||||
case metrictypes.GaugeType:
|
||||
// Gauge always has unspecified temporality
|
||||
req.Temporality = metrictypes.Unspecified
|
||||
|
||||
case metrictypes.SummaryType:
|
||||
// Summary always has cumulative temporality
|
||||
req.Temporality = metrictypes.Cumulative
|
||||
|
||||
default:
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid metric type")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) validateMetricLabels(ctx context.Context, req *metricsexplorertypes.UpdateMetricMetadataRequest) error {
|
||||
if req.Type == metrictypes.HistogramType {
|
||||
hasLabel, err := m.checkForLabelInMetric(ctx, req.MetricName, "le")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !hasLabel {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "metric '%s' cannot be set as histogram type", req.MetricName)
|
||||
}
|
||||
}
|
||||
|
||||
if req.Type == metrictypes.SummaryType {
|
||||
hasLabel, err := m.checkForLabelInMetric(ctx, req.MetricName, "quantile")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !hasLabel {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "metric '%s' cannot be set as summary type", req.MetricName)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) checkForLabelInMetric(ctx context.Context, metricName string, label string) (bool, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("count(*) > 0 AS has_label")
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName))
|
||||
sb.Where(sb.E("metric_name", metricName))
|
||||
sb.Where(sb.E("attr_name", label))
|
||||
sb.Limit(1)
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var hasLabel bool
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
err := db.QueryRow(ctx, query, args...).Scan(&hasLabel)
|
||||
if err != nil {
|
||||
return false, errors.WrapInternalf(err, errors.CodeInternal, "error checking metric label %q", label)
|
||||
}
|
||||
|
||||
return hasLabel, nil
|
||||
}
|
||||
|
||||
func (m *module) insertMetricsMetadata(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.UpdateMetricMetadataRequest) error {
|
||||
createdAt := time.Now().UnixMilli()
|
||||
|
||||
ib := sqlbuilder.NewInsertBuilder()
|
||||
ib.InsertInto(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.UpdatedMetadataTableName))
|
||||
ib.Cols("metric_name", "temporality", "is_monotonic", "type", "description", "unit", "created_at")
|
||||
ib.Values(
|
||||
req.MetricName,
|
||||
req.Temporality,
|
||||
req.IsMonotonic,
|
||||
req.Type,
|
||||
req.Description,
|
||||
req.Unit,
|
||||
createdAt,
|
||||
)
|
||||
|
||||
query, args := ib.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
if err := db.Exec(ctx, query, args...); err != nil {
|
||||
return errors.WrapInternalf(err, errors.CodeInternal, "failed to insert metrics metadata")
|
||||
}
|
||||
|
||||
// Set in cache after successful DB insert
|
||||
metricMetadata := &metricsexplorertypes.MetricMetadata{
|
||||
Description: req.Description,
|
||||
MetricType: req.Type,
|
||||
MetricUnit: req.Unit,
|
||||
Temporality: req.Temporality,
|
||||
IsMonotonic: req.IsMonotonic,
|
||||
}
|
||||
cacheKey := generateMetricMetadataCacheKey(req.MetricName)
|
||||
if err := m.cache.Set(ctx, orgID, cacheKey, metricMetadata, 0); err != nil {
|
||||
m.logger.WarnContext(ctx, "failed to set metric metadata in cache after insert", "metric_name", req.MetricName, "error", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) buildFilterClause(ctx context.Context, filter *qbtypes.Filter, startMillis, endMillis int64) (*sqlbuilder.WhereClause, error) {
|
||||
expression := ""
|
||||
if filter != nil {
|
||||
expression = strings.TrimSpace(filter.Expression)
|
||||
}
|
||||
if expression == "" {
|
||||
return sqlbuilder.NewWhereClause(), nil
|
||||
}
|
||||
|
||||
// TODO(nikhilmantri0902, srikanthccv): if this is the right way of dealing with whereClauseSelectors
|
||||
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(expression)
|
||||
for idx := range whereClauseSelectors {
|
||||
whereClauseSelectors[idx].Signal = telemetrytypes.SignalMetrics
|
||||
whereClauseSelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
||||
// whereClauseSelectors[idx].MetricContext = &telemetrytypes.MetricContext{
|
||||
// MetricName: query.Aggregations[0].MetricName,
|
||||
// }
|
||||
// whereClauseSelectors[idx].Source = query.Source
|
||||
}
|
||||
|
||||
keys, _, err := m.telemetryMetadataStore.GetKeysMulti(ctx, whereClauseSelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts := querybuilder.FilterExprVisitorOpts{
|
||||
Logger: m.logger,
|
||||
FieldMapper: m.fieldMapper,
|
||||
ConditionBuilder: m.condBuilder,
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "labels"},
|
||||
FieldKeys: keys,
|
||||
}
|
||||
|
||||
startNs := uint64(startMillis * 1_000_000)
|
||||
endNs := uint64(endMillis * 1_000_000)
|
||||
|
||||
whereClause, err := querybuilder.PrepareWhereClause(expression, opts, startNs, endNs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if whereClause == nil || whereClause.WhereClause == nil {
|
||||
return sqlbuilder.NewWhereClause(), nil
|
||||
}
|
||||
|
||||
return whereClause.WhereClause, nil
|
||||
}
|
||||
|
||||
func (m *module) fetchMetricsStatsWithSamples(
|
||||
ctx context.Context,
|
||||
req *metricsexplorertypes.StatsRequest,
|
||||
filterWhereClause *sqlbuilder.WhereClause,
|
||||
normalized bool,
|
||||
orderBy *qbtypes.OrderBy,
|
||||
) ([]metricsexplorertypes.Stat, uint64, error) {
|
||||
|
||||
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
// Timeseries counts per metric
|
||||
tsSB := sqlbuilder.NewSelectBuilder()
|
||||
tsSB.Select(
|
||||
"metric_name",
|
||||
"uniq(fingerprint) AS timeseries",
|
||||
)
|
||||
tsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
tsSB.Where(tsSB.Between("unix_milli", start, end))
|
||||
tsSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
tsSB.Where(tsSB.E("__normalized", normalized))
|
||||
if filterWhereClause != nil {
|
||||
tsSB.AddWhereClause(sqlbuilder.CopyWhereClause(filterWhereClause))
|
||||
}
|
||||
tsSB.GroupBy("metric_name")
|
||||
|
||||
// Samples counts per metric
|
||||
samplesSB := sqlbuilder.NewSelectBuilder()
|
||||
samplesSB.Select(
|
||||
"metric_name",
|
||||
fmt.Sprintf("%s AS samples", countExp),
|
||||
)
|
||||
samplesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
samplesSB.Where(samplesSB.Between("unix_milli", req.Start, req.End))
|
||||
samplesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
|
||||
ctes := []*sqlbuilder.CTEQueryBuilder{
|
||||
sqlbuilder.CTEQuery("__time_series_counts").As(tsSB),
|
||||
}
|
||||
|
||||
if filterWhereClause != nil {
|
||||
fingerprintSB := sqlbuilder.NewSelectBuilder()
|
||||
fingerprintSB.Select("fingerprint")
|
||||
fingerprintSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTsTable))
|
||||
fingerprintSB.Where(fingerprintSB.Between("unix_milli", start, end))
|
||||
fingerprintSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
fingerprintSB.Where(fingerprintSB.E("__normalized", normalized))
|
||||
fingerprintSB.AddWhereClause(sqlbuilder.CopyWhereClause(filterWhereClause))
|
||||
fingerprintSB.GroupBy("fingerprint")
|
||||
|
||||
ctes = append(ctes, sqlbuilder.CTEQuery("__filtered_fingerprints").As(fingerprintSB))
|
||||
samplesSB.Where("fingerprint IN (SELECT fingerprint FROM __filtered_fingerprints)")
|
||||
}
|
||||
samplesSB.GroupBy("metric_name")
|
||||
|
||||
ctes = append(ctes, sqlbuilder.CTEQuery("__sample_counts").As(samplesSB))
|
||||
cteBuilder := sqlbuilder.With(ctes...)
|
||||
|
||||
finalSB := cteBuilder.Select(
|
||||
"COALESCE(ts.metric_name, s.metric_name) AS metric_name",
|
||||
"COALESCE(ts.timeseries, 0) AS timeseries",
|
||||
"COALESCE(s.samples, 0) AS samples",
|
||||
"COUNT(*) OVER() AS total",
|
||||
)
|
||||
finalSB.From("__time_series_counts ts")
|
||||
finalSB.JoinWithOption(sqlbuilder.FullOuterJoin, "__sample_counts s", "ts.metric_name = s.metric_name")
|
||||
finalSB.Where("(COALESCE(ts.timeseries, 0) > 0 OR COALESCE(s.samples, 0) > 0)")
|
||||
|
||||
orderByColumn, orderDirection, err := getStatsOrderByColumn(orderBy)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
finalSB.OrderBy(
|
||||
fmt.Sprintf("%s %s", orderByColumn, strings.ToUpper(orderDirection)),
|
||||
"metric_name ASC",
|
||||
)
|
||||
finalSB.Limit(req.Limit)
|
||||
finalSB.Offset(req.Offset)
|
||||
|
||||
query, args := finalSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to execute metrics stats with samples query")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
metricStats := make([]metricsexplorertypes.Stat, 0)
|
||||
var total uint64
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
metricStat metricsexplorertypes.Stat
|
||||
rowTotal uint64
|
||||
)
|
||||
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal); err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metrics stats row")
|
||||
}
|
||||
metricStats = append(metricStats, metricStat)
|
||||
total = rowTotal
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metrics stats rows")
|
||||
}
|
||||
|
||||
return metricStats, total, nil
|
||||
}
|
||||
|
||||
func (m *module) computeTimeseriesTreemap(ctx context.Context, req *metricsexplorertypes.TreemapRequest, filterWhereClause *sqlbuilder.WhereClause) ([]metricsexplorertypes.TreemapEntry, error) {
|
||||
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
|
||||
totalTSBuilder := sqlbuilder.NewSelectBuilder()
|
||||
totalTSBuilder.Select("uniq(fingerprint) AS total_time_series")
|
||||
totalTSBuilder.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
totalTSBuilder.Where(totalTSBuilder.Between("unix_milli", start, end))
|
||||
totalTSBuilder.Where(totalTSBuilder.E("__normalized", false))
|
||||
|
||||
metricsSB := sqlbuilder.NewSelectBuilder()
|
||||
metricsSB.Select(
|
||||
"metric_name",
|
||||
"uniq(fingerprint) AS total_value",
|
||||
)
|
||||
metricsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
metricsSB.Where(metricsSB.Between("unix_milli", start, end))
|
||||
metricsSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
metricsSB.Where(metricsSB.E("__normalized", false))
|
||||
if filterWhereClause != nil {
|
||||
metricsSB.WhereClause.AddWhereClause(sqlbuilder.CopyWhereClause(filterWhereClause))
|
||||
}
|
||||
metricsSB.GroupBy("metric_name")
|
||||
|
||||
cteBuilder := sqlbuilder.With(
|
||||
sqlbuilder.CTEQuery("__total_time_series").As(totalTSBuilder),
|
||||
sqlbuilder.CTEQuery("__metric_totals").As(metricsSB),
|
||||
)
|
||||
|
||||
finalSB := cteBuilder.Select(
|
||||
"mt.metric_name",
|
||||
"mt.total_value",
|
||||
"CASE WHEN tts.total_time_series = 0 THEN 0 ELSE (mt.total_value * 100.0 / tts.total_time_series) END AS percentage",
|
||||
)
|
||||
finalSB.From("__metric_totals mt")
|
||||
finalSB.Join("__total_time_series tts", "1=1")
|
||||
finalSB.OrderBy("percentage").Desc()
|
||||
finalSB.Limit(req.Limit)
|
||||
|
||||
query, args := finalSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to execute timeseries treemap query")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
entries := make([]metricsexplorertypes.TreemapEntry, 0)
|
||||
for rows.Next() {
|
||||
var treemapEntry metricsexplorertypes.TreemapEntry
|
||||
if err := rows.Scan(&treemapEntry.MetricName, &treemapEntry.TotalValue, &treemapEntry.Percentage); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan timeseries treemap row")
|
||||
}
|
||||
entries = append(entries, treemapEntry)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating timeseries treemap rows")
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func (m *module) computeSamplesTreemap(ctx context.Context, req *metricsexplorertypes.TreemapRequest, filterWhereClause *sqlbuilder.WhereClause) ([]metricsexplorertypes.TreemapEntry, error) {
|
||||
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
candidateLimit := req.Limit + 50
|
||||
|
||||
metricCandidatesSB := sqlbuilder.NewSelectBuilder()
|
||||
metricCandidatesSB.Select("metric_name")
|
||||
metricCandidatesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
metricCandidatesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
metricCandidatesSB.Where(metricCandidatesSB.E("__normalized", false))
|
||||
metricCandidatesSB.Where(metricCandidatesSB.Between("unix_milli", start, end))
|
||||
if filterWhereClause != nil {
|
||||
metricCandidatesSB.AddWhereClause(sqlbuilder.CopyWhereClause(filterWhereClause))
|
||||
}
|
||||
metricCandidatesSB.GroupBy("metric_name")
|
||||
metricCandidatesSB.OrderBy("uniq(fingerprint) DESC")
|
||||
metricCandidatesSB.Limit(candidateLimit)
|
||||
|
||||
cteQueries := []*sqlbuilder.CTEQueryBuilder{
|
||||
sqlbuilder.CTEQuery("__metric_candidates").As(metricCandidatesSB),
|
||||
}
|
||||
|
||||
totalSamplesSB := sqlbuilder.NewSelectBuilder()
|
||||
totalSamplesSB.Select(fmt.Sprintf("%s AS total_samples", countExp))
|
||||
totalSamplesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
totalSamplesSB.Where(totalSamplesSB.Between("unix_milli", req.Start, req.End))
|
||||
|
||||
sampleCountsSB := sqlbuilder.NewSelectBuilder()
|
||||
sampleCountsSB.Select(
|
||||
"metric_name",
|
||||
fmt.Sprintf("%s AS samples", countExp),
|
||||
)
|
||||
sampleCountsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
sampleCountsSB.Where(sampleCountsSB.Between("unix_milli", req.Start, req.End))
|
||||
sampleCountsSB.Where("metric_name IN (SELECT metric_name FROM __metric_candidates)")
|
||||
|
||||
if filterWhereClause != nil {
|
||||
fingerprintSB := sqlbuilder.NewSelectBuilder()
|
||||
fingerprintSB.Select("fingerprint")
|
||||
fingerprintSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTsTable))
|
||||
fingerprintSB.Where(fingerprintSB.Between("unix_milli", start, end))
|
||||
fingerprintSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
fingerprintSB.Where(fingerprintSB.E("__normalized", false))
|
||||
fingerprintSB.AddWhereClause(sqlbuilder.CopyWhereClause(filterWhereClause))
|
||||
fingerprintSB.Where("metric_name IN (SELECT metric_name FROM __metric_candidates)")
|
||||
fingerprintSB.GroupBy("fingerprint")
|
||||
|
||||
sampleCountsSB.Where("fingerprint IN (SELECT fingerprint FROM __filtered_fingerprints)")
|
||||
|
||||
cteQueries = append(cteQueries, sqlbuilder.CTEQuery("__filtered_fingerprints").As(fingerprintSB))
|
||||
}
|
||||
|
||||
sampleCountsSB.GroupBy("metric_name")
|
||||
|
||||
cteQueries = append(cteQueries,
|
||||
sqlbuilder.CTEQuery("__sample_counts").As(sampleCountsSB),
|
||||
sqlbuilder.CTEQuery("__total_samples").As(totalSamplesSB),
|
||||
)
|
||||
|
||||
cteBuilder := sqlbuilder.With(cteQueries...)
|
||||
|
||||
finalSB := cteBuilder.Select(
|
||||
"mc.metric_name",
|
||||
"COALESCE(sc.samples, 0) AS samples",
|
||||
"CASE WHEN ts.total_samples = 0 THEN 0 ELSE (COALESCE(sc.samples, 0) * 100.0 / ts.total_samples) END AS percentage",
|
||||
)
|
||||
finalSB.From("__metric_candidates mc")
|
||||
finalSB.JoinWithOption(sqlbuilder.LeftJoin, "__sample_counts sc", "mc.metric_name = sc.metric_name")
|
||||
finalSB.Join("__total_samples ts", "1=1")
|
||||
finalSB.OrderBy("percentage DESC")
|
||||
finalSB.Limit(req.Limit)
|
||||
|
||||
query, args := finalSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to execute samples treemap query")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
entries := make([]metricsexplorertypes.TreemapEntry, 0)
|
||||
for rows.Next() {
|
||||
var treemapEntry metricsexplorertypes.TreemapEntry
|
||||
if err := rows.Scan(&treemapEntry.MetricName, &treemapEntry.TotalValue, &treemapEntry.Percentage); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan samples treemap row")
|
||||
}
|
||||
entries = append(entries, treemapEntry)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating samples treemap rows")
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
25
pkg/modules/metricsexplorer/metricsexplorer.go
Normal file
25
pkg/modules/metricsexplorer/metricsexplorer.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package metricsexplorer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/metricsexplorertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Handler exposes HTTP handlers for the metrics module.
|
||||
type Handler interface {
|
||||
GetStats(http.ResponseWriter, *http.Request)
|
||||
GetTreemap(http.ResponseWriter, *http.Request)
|
||||
GetMetricMetadata(http.ResponseWriter, *http.Request)
|
||||
UpdateMetricMetadata(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
// Module represents the metrics module interface.
|
||||
type Module interface {
|
||||
GetStats(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.StatsRequest) (*metricsexplorertypes.StatsResponse, error)
|
||||
GetTreemap(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.TreemapRequest) (*metricsexplorertypes.TreemapResponse, error)
|
||||
GetMetricMetadataMulti(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]*metricsexplorertypes.MetricMetadata, error)
|
||||
UpdateMetricMetadata(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.UpdateMetricMetadataRequest) error
|
||||
}
|
||||
@@ -659,6 +659,11 @@ func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *middleware.Au
|
||||
router.HandleFunc("/api/v1/metrics/{metric_name}/metadata",
|
||||
am.ViewAccess(ah.UpdateMetricsMetadata)).
|
||||
Methods(http.MethodPost)
|
||||
// v2 endpoints
|
||||
router.HandleFunc("/api/v2/metrics/stats", am.ViewAccess(ah.Signoz.Handlers.Metrics.GetStats)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metrics/treemap", am.ViewAccess(ah.Signoz.Handlers.Metrics.GetTreemap)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metrics/metadata", am.ViewAccess(ah.Signoz.Handlers.Metrics.GetMetricMetadata)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v2/metrics/{metric_name}/metadata", am.ViewAccess(ah.Signoz.Handlers.Metrics.UpdateMetricMetadata)).Methods(http.MethodPost)
|
||||
}
|
||||
|
||||
func Intersection(a, b []int) (c []int) {
|
||||
|
||||
@@ -94,6 +94,7 @@ type PreparedWhereClause struct {
|
||||
|
||||
// PrepareWhereClause generates a ClickHouse compatible WHERE clause from the filter query
|
||||
func PrepareWhereClause(query string, opts FilterExprVisitorOpts, startNs uint64, endNs uint64) (*PreparedWhereClause, error) {
|
||||
|
||||
// Setup the ANTLR parsing pipeline
|
||||
input := antlr.NewInputStream(query)
|
||||
lexer := grammar.NewFilterQueryLexer(input)
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -46,6 +48,7 @@ type Handlers struct {
|
||||
Session session.Handler
|
||||
SpanPercentile spanpercentile.Handler
|
||||
Services services.Handler
|
||||
Metrics metricsexplorer.Handler
|
||||
}
|
||||
|
||||
func NewHandlers(modules Modules, providerSettings factory.ProviderSettings, querier querier.Querier, licensing licensing.Licensing) Handlers {
|
||||
@@ -62,6 +65,7 @@ func NewHandlers(modules Modules, providerSettings factory.ProviderSettings, que
|
||||
AuthDomain: implauthdomain.NewHandler(modules.AuthDomain),
|
||||
Session: implsession.NewHandler(modules.Session),
|
||||
Services: implservices.NewHandler(modules.Services),
|
||||
Metrics: implmetricsexplorer.NewHandler(modules.Metrics),
|
||||
SpanPercentile: implspanpercentile.NewHandler(modules.SpanPercentile),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,8 @@ func TestNewHandlers(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
tokenizer := tokenizertest.New()
|
||||
emailing := emailingtest.New()
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil)
|
||||
|
||||
handlers := NewHandlers(modules, providerSettings, nil, nil)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/analytics"
|
||||
"github.com/SigNoz/signoz/pkg/authn"
|
||||
"github.com/SigNoz/signoz/pkg/authz"
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/emailing"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/modules/apdex"
|
||||
@@ -13,6 +14,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
@@ -40,6 +43,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/tokenizer"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
type Modules struct {
|
||||
@@ -58,6 +62,7 @@ type Modules struct {
|
||||
Session session.Module
|
||||
Services services.Module
|
||||
SpanPercentile spanpercentile.Module
|
||||
Metrics metricsexplorer.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -70,8 +75,10 @@ func NewModules(
|
||||
analytics analytics.Analytics,
|
||||
querier querier.Querier,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
telemetryMetadataStore telemetrytypes.MetadataStore,
|
||||
authNs map[authtypes.AuthNProvider]authn.AuthN,
|
||||
authz authz.AuthZ,
|
||||
cache cache.Cache,
|
||||
) Modules {
|
||||
quickfilter := implquickfilter.NewModule(implquickfilter.NewStore(sqlstore))
|
||||
orgSetter := implorganization.NewSetter(implorganization.NewStore(sqlstore), alertmanager, quickfilter)
|
||||
@@ -94,5 +101,6 @@ func NewModules(
|
||||
Session: implsession.NewModule(providerSettings, authNs, user, userGetter, implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs), tokenizer, orgGetter),
|
||||
SpanPercentile: implspanpercentile.NewModule(querier, providerSettings),
|
||||
Services: implservices.NewModule(querier, telemetryStore),
|
||||
Metrics: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, providerSettings),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,8 @@ func TestNewModules(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
tokenizer := tokenizertest.New()
|
||||
emailing := emailingtest.New()
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil)
|
||||
|
||||
reflectVal := reflect.ValueOf(modules)
|
||||
for i := 0; i < reflectVal.NumField(); i++ {
|
||||
|
||||
@@ -26,9 +26,15 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
pkgtokenizer "github.com/SigNoz/signoz/pkg/tokenizer"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/version"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
|
||||
@@ -37,24 +43,25 @@ import (
|
||||
|
||||
type SigNoz struct {
|
||||
*factory.Registry
|
||||
Instrumentation instrumentation.Instrumentation
|
||||
Analytics analytics.Analytics
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
Prometheus prometheus.Prometheus
|
||||
Alertmanager alertmanager.Alertmanager
|
||||
Querier querier.Querier
|
||||
Zeus zeus.Zeus
|
||||
Licensing licensing.Licensing
|
||||
Emailing emailing.Emailing
|
||||
Sharder sharder.Sharder
|
||||
StatsReporter statsreporter.StatsReporter
|
||||
Tokenizer pkgtokenizer.Tokenizer
|
||||
Authz authz.AuthZ
|
||||
Modules Modules
|
||||
Handlers Handlers
|
||||
Instrumentation instrumentation.Instrumentation
|
||||
Analytics analytics.Analytics
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
TelemetryMetadataStore telemetrytypes.MetadataStore
|
||||
Prometheus prometheus.Prometheus
|
||||
Alertmanager alertmanager.Alertmanager
|
||||
Querier querier.Querier
|
||||
Zeus zeus.Zeus
|
||||
Licensing licensing.Licensing
|
||||
Emailing emailing.Emailing
|
||||
Sharder sharder.Sharder
|
||||
StatsReporter statsreporter.StatsReporter
|
||||
Tokenizer pkgtokenizer.Tokenizer
|
||||
Authz authz.AuthZ
|
||||
Modules Modules
|
||||
Handlers Handlers
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -309,8 +316,30 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize telemetry metadata store
|
||||
// TODO: consolidate other telemetrymetadata.NewTelemetryMetaStore initializations to reuse this instance instead.
|
||||
telemetryMetadataStore := telemetrymetadata.NewTelemetryMetaStore(
|
||||
providerSettings,
|
||||
telemetrystore,
|
||||
telemetrytraces.DBName,
|
||||
telemetrytraces.TagAttributesV2TableName,
|
||||
telemetrytraces.SpanAttributesKeysTblName,
|
||||
telemetrytraces.SpanIndexV3TableName,
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.SamplesAgg1dTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
)
|
||||
|
||||
// Initialize all modules
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, authNs, authz)
|
||||
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache)
|
||||
|
||||
// Initialize all handlers for the modules
|
||||
handlers := NewHandlers(modules, providerSettings, querier, licensing)
|
||||
@@ -354,23 +383,24 @@ func New(
|
||||
}
|
||||
|
||||
return &SigNoz{
|
||||
Registry: registry,
|
||||
Analytics: analytics,
|
||||
Instrumentation: instrumentation,
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
Prometheus: prometheus,
|
||||
Alertmanager: alertmanager,
|
||||
Querier: querier,
|
||||
Zeus: zeus,
|
||||
Licensing: licensing,
|
||||
Emailing: emailing,
|
||||
Sharder: sharder,
|
||||
Tokenizer: tokenizer,
|
||||
Authz: authz,
|
||||
Modules: modules,
|
||||
Handlers: handlers,
|
||||
Registry: registry,
|
||||
Analytics: analytics,
|
||||
Instrumentation: instrumentation,
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
TelemetryMetadataStore: telemetryMetadataStore,
|
||||
Prometheus: prometheus,
|
||||
Alertmanager: alertmanager,
|
||||
Querier: querier,
|
||||
Zeus: zeus,
|
||||
Licensing: licensing,
|
||||
Emailing: emailing,
|
||||
Sharder: sharder,
|
||||
Tokenizer: tokenizer,
|
||||
Authz: authz,
|
||||
Modules: modules,
|
||||
Handlers: handlers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -2387,7 +2387,7 @@ func TestFilterExprLogs(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.category, limitString(tc.query, 50)), func(t *testing.T) {
|
||||
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
|
||||
if tc.shouldPass {
|
||||
if err != nil {
|
||||
@@ -2506,7 +2506,7 @@ func TestFilterExprLogsConflictNegation(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.category, limitString(tc.query, 50)), func(t *testing.T) {
|
||||
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
|
||||
if tc.shouldPass {
|
||||
if err != nil {
|
||||
|
||||
@@ -348,13 +348,13 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
}, start, end)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
}
|
||||
|
||||
start, end, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints)
|
||||
start, end, _, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints)
|
||||
sb.From(fmt.Sprintf("%s.%s", DBName, tbl))
|
||||
|
||||
sb.Select("fingerprint")
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
|
||||
const (
|
||||
DBName = "signoz_metrics"
|
||||
UpdatedMetadataTableName = "distributed_updated_metadata"
|
||||
UpdatedMetadataLocalTableName = "updated_metadata"
|
||||
SamplesV4TableName = "distributed_samples_v4"
|
||||
SamplesV4LocalTableName = "samples_v4"
|
||||
SamplesV4Agg5mTableName = "distributed_samples_v4_agg_5m"
|
||||
@@ -41,29 +43,36 @@ var (
|
||||
offsetBucket = uint64(60 * time.Minute.Milliseconds())
|
||||
)
|
||||
|
||||
// WhichTSTableToUse returns adjusted start, adjusted end, distributed table name, local table name
|
||||
// in that order
|
||||
func WhichTSTableToUse(
|
||||
start, end uint64,
|
||||
tableHints *metrictypes.MetricTableHints,
|
||||
) (uint64, uint64, string) {
|
||||
) (uint64, uint64, string, string) {
|
||||
// if we have a hint for the table, we need to use it
|
||||
// the hint will be used to override the default table selection logic
|
||||
if tableHints != nil {
|
||||
if tableHints.TimeSeriesTableName != "" {
|
||||
var distributedTableName string
|
||||
switch tableHints.TimeSeriesTableName {
|
||||
case TimeseriesV4LocalTableName:
|
||||
// adjust the start time to nearest 1 hour
|
||||
start = start - (start % (oneHourInMilliseconds))
|
||||
distributedTableName = TimeseriesV4TableName
|
||||
case TimeseriesV46hrsLocalTableName:
|
||||
// adjust the start time to nearest 6 hours
|
||||
start = start - (start % (sixHoursInMilliseconds))
|
||||
distributedTableName = TimeseriesV46hrsTableName
|
||||
case TimeseriesV41dayLocalTableName:
|
||||
// adjust the start time to nearest 1 day
|
||||
start = start - (start % (oneDayInMilliseconds))
|
||||
distributedTableName = TimeseriesV41dayTableName
|
||||
case TimeseriesV41weekLocalTableName:
|
||||
// adjust the start time to nearest 1 week
|
||||
start = start - (start % (oneWeekInMilliseconds))
|
||||
distributedTableName = TimeseriesV41weekTableName
|
||||
}
|
||||
return start, end, tableHints.TimeSeriesTableName
|
||||
return start, end, distributedTableName, tableHints.TimeSeriesTableName
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,26 +80,46 @@ func WhichTSTableToUse(
|
||||
// else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table
|
||||
// else if time range is less than 1 week and greater than 1 day, we need to use the `time_series_v4_1day` table
|
||||
// else we need to use the `time_series_v4_1week` table
|
||||
var tableName string
|
||||
var distributedTableName string
|
||||
var localTableName string
|
||||
if end-start < sixHoursInMilliseconds {
|
||||
// adjust the start time to nearest 1 hour
|
||||
start = start - (start % (oneHourInMilliseconds))
|
||||
tableName = TimeseriesV4LocalTableName
|
||||
distributedTableName = TimeseriesV4TableName
|
||||
localTableName = TimeseriesV4LocalTableName
|
||||
} else if end-start < oneDayInMilliseconds {
|
||||
// adjust the start time to nearest 6 hours
|
||||
start = start - (start % (sixHoursInMilliseconds))
|
||||
tableName = TimeseriesV46hrsLocalTableName
|
||||
distributedTableName = TimeseriesV46hrsTableName
|
||||
localTableName = TimeseriesV46hrsLocalTableName
|
||||
} else if end-start < oneWeekInMilliseconds {
|
||||
// adjust the start time to nearest 1 day
|
||||
start = start - (start % (oneDayInMilliseconds))
|
||||
tableName = TimeseriesV41dayLocalTableName
|
||||
distributedTableName = TimeseriesV41dayTableName
|
||||
localTableName = TimeseriesV41dayLocalTableName
|
||||
} else {
|
||||
// adjust the start time to nearest 1 week
|
||||
start = start - (start % (oneWeekInMilliseconds))
|
||||
tableName = TimeseriesV41weekLocalTableName
|
||||
distributedTableName = TimeseriesV41weekTableName
|
||||
localTableName = TimeseriesV41weekLocalTableName
|
||||
}
|
||||
|
||||
return start, end, tableName
|
||||
return start, end, distributedTableName, localTableName
|
||||
}
|
||||
|
||||
// CountExpressionForSamplesTable returns the count expression for a given samples table name.
|
||||
// For non-aggregated tables (distributed_samples_v4, exp_hist), it returns "count(*)".
|
||||
// For aggregated tables (distributed_samples_v4_agg_5m, distributed_samples_v4_agg_30m), it returns "sum(count)".
|
||||
func CountExpressionForSamplesTable(tableName string) string {
|
||||
// Non-aggregated tables use count(*)
|
||||
if tableName == SamplesV4TableName ||
|
||||
tableName == SamplesV4LocalTableName ||
|
||||
tableName == ExpHistogramTableName ||
|
||||
tableName == ExpHistogramLocalTableName {
|
||||
return "count(*)"
|
||||
}
|
||||
// Aggregated tables use sum(count)
|
||||
return "sum(count)"
|
||||
}
|
||||
|
||||
// start and end are in milliseconds
|
||||
@@ -105,7 +134,6 @@ func WhichSamplesTableToUse(
|
||||
timeAggregation metrictypes.TimeAggregation,
|
||||
tableHints *metrictypes.MetricTableHints,
|
||||
) string {
|
||||
|
||||
// if we have a hint for the table, we need to use it
|
||||
// the hint will be used to override the default table selection logic
|
||||
if tableHints != nil {
|
||||
|
||||
222
pkg/types/metricsexplorertypes/metricsexplorertypes.go
Normal file
222
pkg/types/metricsexplorertypes/metricsexplorertypes.go
Normal file
@@ -0,0 +1,222 @@
|
||||
package metricsexplorertypes
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MetricOrderBy represents the order-by field for metrics queries.
|
||||
type MetricOrderBy struct {
|
||||
valuer.String
|
||||
}
|
||||
|
||||
var (
|
||||
OrderByTimeSeries = MetricOrderBy{valuer.NewString("timeseries")}
|
||||
OrderBySamples = MetricOrderBy{valuer.NewString("samples")}
|
||||
)
|
||||
|
||||
// TreemapMode indicates which treemap variant the caller requests.
|
||||
type TreemapMode struct {
|
||||
valuer.String
|
||||
}
|
||||
|
||||
var (
|
||||
// TreemapModeTimeSeries represents the treemap based on timeseries counts.
|
||||
TreemapModeTimeSeries = TreemapMode{valuer.NewString("timeseries")}
|
||||
// TreemapModeSamples represents the treemap based on sample counts.
|
||||
TreemapModeSamples = TreemapMode{valuer.NewString("samples")}
|
||||
)
|
||||
|
||||
// StatsRequest represents the payload accepted by the metrics stats endpoint.
|
||||
type StatsRequest struct {
|
||||
Filter *qbtypes.Filter `json:"filter,omitempty"`
|
||||
Start int64 `json:"start"`
|
||||
End int64 `json:"end"`
|
||||
Limit int `json:"limit"`
|
||||
Offset int `json:"offset"`
|
||||
OrderBy *qbtypes.OrderBy `json:"orderBy,omitempty"`
|
||||
}
|
||||
|
||||
// Validate ensures StatsRequest contains acceptable values.
|
||||
func (req *StatsRequest) Validate() error {
|
||||
if req == nil {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
|
||||
}
|
||||
|
||||
if req.Start <= 0 {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid start time %d: start must be greater than 0",
|
||||
req.Start,
|
||||
)
|
||||
}
|
||||
|
||||
if req.End <= 0 {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid end time %d: end must be greater than 0",
|
||||
req.End,
|
||||
)
|
||||
}
|
||||
|
||||
if req.Start >= req.End {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid time range: start (%d) must be less than end (%d)",
|
||||
req.Start,
|
||||
req.End,
|
||||
)
|
||||
}
|
||||
|
||||
if req.Limit < 1 || req.Limit > 5000 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
|
||||
}
|
||||
|
||||
if req.Offset < 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "offset cannot be negative")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON validates input immediately after decoding.
|
||||
func (req *StatsRequest) UnmarshalJSON(data []byte) error {
|
||||
type raw StatsRequest
|
||||
var decoded raw
|
||||
if err := json.Unmarshal(data, &decoded); err != nil {
|
||||
return err
|
||||
}
|
||||
*req = StatsRequest(decoded)
|
||||
return req.Validate()
|
||||
}
|
||||
|
||||
// Stat represents the summary information returned per metric.
|
||||
type Stat struct {
|
||||
MetricName string `json:"metricName"`
|
||||
Description string `json:"description"`
|
||||
MetricType metrictypes.Type `json:"type"`
|
||||
MetricUnit string `json:"unit"`
|
||||
TimeSeries uint64 `json:"timeseries"`
|
||||
Samples uint64 `json:"samples"`
|
||||
}
|
||||
|
||||
// StatsResponse represents the aggregated metrics statistics.
|
||||
type StatsResponse struct {
|
||||
Metrics []Stat `json:"metrics"`
|
||||
Total uint64 `json:"total"`
|
||||
}
|
||||
|
||||
type MetricMetadata struct {
|
||||
Description string `json:"description"`
|
||||
MetricType metrictypes.Type `json:"type"`
|
||||
MetricUnit string `json:"unit"`
|
||||
Temporality metrictypes.Temporality `json:"temporality"`
|
||||
IsMonotonic bool `json:"isMonotonic"`
|
||||
}
|
||||
|
||||
// MarshalBinary implements cachetypes.Cacheable interface
|
||||
func (m *MetricMetadata) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(m)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements cachetypes.Cacheable interface
|
||||
func (m *MetricMetadata) UnmarshalBinary(data []byte) error {
|
||||
return json.Unmarshal(data, m)
|
||||
}
|
||||
|
||||
// UpdateMetricMetadataRequest represents the payload for updating metric metadata.
|
||||
type UpdateMetricMetadataRequest struct {
|
||||
MetricName string `json:"metricName"`
|
||||
Type metrictypes.Type `json:"type"`
|
||||
Description string `json:"description"`
|
||||
Unit string `json:"unit"`
|
||||
Temporality metrictypes.Temporality `json:"temporality"`
|
||||
IsMonotonic bool `json:"isMonotonic"`
|
||||
}
|
||||
|
||||
// TreemapRequest represents the payload for the metrics treemap endpoint.
|
||||
type TreemapRequest struct {
|
||||
Filter *qbtypes.Filter `json:"filter,omitempty"`
|
||||
Start int64 `json:"start"`
|
||||
End int64 `json:"end"`
|
||||
Limit int `json:"limit"`
|
||||
Treemap TreemapMode `json:"treemap"`
|
||||
}
|
||||
|
||||
// Validate enforces basic constraints on TreemapRequest.
|
||||
func (req *TreemapRequest) Validate() error {
|
||||
if req == nil {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
|
||||
}
|
||||
|
||||
if req.Start <= 0 {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid start time %d: start must be greater than 0",
|
||||
req.Start,
|
||||
)
|
||||
}
|
||||
|
||||
if req.End <= 0 {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid end time %d: end must be greater than 0",
|
||||
req.End,
|
||||
)
|
||||
}
|
||||
|
||||
if req.Start >= req.End {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid time range: start (%d) must be less than end (%d)",
|
||||
req.Start,
|
||||
req.End,
|
||||
)
|
||||
}
|
||||
|
||||
if req.Limit < 1 || req.Limit > 5000 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
|
||||
}
|
||||
|
||||
if req.Treemap != TreemapModeSamples && req.Treemap != TreemapModeTimeSeries {
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
"invalid treemap mode %q: supported values are %q or %q",
|
||||
req.Treemap,
|
||||
TreemapModeSamples,
|
||||
TreemapModeTimeSeries,
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON validates treemap requests immediately after decoding.
|
||||
func (req *TreemapRequest) UnmarshalJSON(data []byte) error {
|
||||
type raw TreemapRequest
|
||||
var decoded raw
|
||||
|
||||
if err := json.Unmarshal(data, &decoded); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*req = TreemapRequest(decoded)
|
||||
return req.Validate()
|
||||
}
|
||||
|
||||
// TreemapEntry represents each node in the treemap response.
|
||||
type TreemapEntry struct {
|
||||
MetricName string `json:"metricName"`
|
||||
Percentage float64 `json:"percentage"`
|
||||
TotalValue uint64 `json:"totalValue"`
|
||||
}
|
||||
|
||||
// TreemapResponse is the output structure for the treemap endpoint.
|
||||
type TreemapResponse struct {
|
||||
TimeSeries []TreemapEntry `json:"timeseries"`
|
||||
Samples []TreemapEntry `json:"samples"`
|
||||
}
|
||||
@@ -1,6 +1,10 @@
|
||||
package metrictypes
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
@@ -17,12 +21,110 @@ var (
|
||||
Unknown = Temporality{valuer.NewString("")}
|
||||
)
|
||||
|
||||
func (t Temporality) Value() (driver.Value, error) {
|
||||
switch t {
|
||||
case Delta:
|
||||
return "Delta", nil
|
||||
case Cumulative:
|
||||
return "Cumulative", nil
|
||||
case Unspecified:
|
||||
return "Unspecified", nil
|
||||
case Unknown:
|
||||
return "", nil
|
||||
default:
|
||||
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "temporality: unsupported value %q", t.StringValue())
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Temporality) Scan(src interface{}) error {
|
||||
if src == nil {
|
||||
*t = Unknown
|
||||
return nil
|
||||
}
|
||||
|
||||
var val string
|
||||
switch v := src.(type) {
|
||||
case string:
|
||||
val = v
|
||||
case []byte:
|
||||
val = string(v)
|
||||
default:
|
||||
return errors.Newf(errors.TypeInternal, errors.CodeInternal, "temporality: cannot scan %T", src)
|
||||
}
|
||||
|
||||
switch strings.ToLower(strings.TrimSpace(val)) {
|
||||
case "delta":
|
||||
*t = Delta
|
||||
case "cumulative":
|
||||
*t = Cumulative
|
||||
case "unspecified":
|
||||
*t = Unspecified
|
||||
default:
|
||||
*t = Unknown
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Type is the type of the metric in OTLP data model
|
||||
// Read more here https://opentelemetry.io/docs/specs/otel/metrics/data-model/#metric-points
|
||||
type Type struct {
|
||||
valuer.String
|
||||
}
|
||||
|
||||
func (t Type) Value() (driver.Value, error) {
|
||||
switch t {
|
||||
case GaugeType:
|
||||
return "Gauge", nil
|
||||
case SumType:
|
||||
return "Sum", nil
|
||||
case HistogramType:
|
||||
return "Histogram", nil
|
||||
case SummaryType:
|
||||
return "Summary", nil
|
||||
case ExpHistogramType:
|
||||
return "ExponentialHistogram", nil
|
||||
case UnspecifiedType:
|
||||
return "", nil
|
||||
default:
|
||||
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric type: unsupported value %q", t.StringValue())
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Type) Scan(src interface{}) error {
|
||||
if src == nil {
|
||||
*t = UnspecifiedType
|
||||
return nil
|
||||
}
|
||||
|
||||
var val string
|
||||
switch v := src.(type) {
|
||||
case string:
|
||||
val = v
|
||||
case []byte:
|
||||
val = string(v)
|
||||
default:
|
||||
return errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric type: cannot scan %T", src)
|
||||
}
|
||||
|
||||
switch strings.ToLower(strings.TrimSpace(val)) {
|
||||
case "gauge":
|
||||
*t = GaugeType
|
||||
case "sum":
|
||||
*t = SumType
|
||||
case "histogram":
|
||||
*t = HistogramType
|
||||
case "summary":
|
||||
*t = SummaryType
|
||||
case "exponentialhistogram":
|
||||
*t = ExpHistogramType
|
||||
default:
|
||||
*t = UnspecifiedType
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
GaugeType = Type{valuer.NewString("gauge")}
|
||||
SumType = Type{valuer.NewString("sum")}
|
||||
|
||||
Reference in New Issue
Block a user