mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-26 20:00:33 +01:00
Compare commits
4 Commits
traceop-re
...
metricsExp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fb4c4b85b | ||
|
|
c61e388646 | ||
|
|
05e84b04cc | ||
|
|
def414c7bb |
@@ -219,19 +219,25 @@ func (m *module) GetStats(ctx context.Context, orgID valuer.UUID, req *metricsex
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filterWhereClause, err := m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var (
|
||||
metricStats []metricsexplorertypes.Stat
|
||||
total uint64
|
||||
err error
|
||||
)
|
||||
|
||||
hasFilter := req.Filter != nil && strings.TrimSpace(req.Filter.Expression) != ""
|
||||
|
||||
if hasFilter {
|
||||
var filterWhereClause *sqlbuilder.WhereClause
|
||||
filterWhereClause, err = m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metricStats, total, err = m.fetchMetricsStatsWithSamples(ctx, req, filterWhereClause, false, req.OrderBy)
|
||||
} else {
|
||||
metricStats, total, err = m.fetchMetricsStatsWithSamplesFastPath(ctx, req, false, req.OrderBy)
|
||||
}
|
||||
|
||||
// Single query to get stats with samples, timeseries counts in required sorting order
|
||||
metricStats, total, err := m.fetchMetricsStatsWithSamples(
|
||||
ctx,
|
||||
req,
|
||||
filterWhereClause,
|
||||
false,
|
||||
req.OrderBy,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -268,21 +274,28 @@ func (m *module) GetTreemap(ctx context.Context, orgID valuer.UUID, req *metrics
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hasFilter := req.Filter != nil && strings.TrimSpace(req.Filter.Expression) != ""
|
||||
filterWhereClause, err := m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var entries []metricsexplorertypes.TreemapEntry
|
||||
|
||||
resp := &metricsexplorertypes.TreemapResponse{}
|
||||
switch req.Mode {
|
||||
case metricsexplorertypes.TreemapModeSamples:
|
||||
entries, err := m.computeSamplesTreemap(ctx, req, filterWhereClause)
|
||||
if hasFilter {
|
||||
entries, err = m.computeSamplesTreemap(ctx, req, filterWhereClause)
|
||||
} else {
|
||||
entries, err = m.computeSamplesTreemapFastPath(ctx, req)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp.Samples = entries
|
||||
default: // TreemapModeTimeSeries
|
||||
entries, err := m.computeTimeseriesTreemap(ctx, req, filterWhereClause)
|
||||
entries, err = m.computeTimeseriesTreemap(ctx, req, filterWhereClause)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1085,6 +1098,109 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
return metricStats, total, nil
|
||||
}
|
||||
|
||||
func (m *module) fetchMetricsStatsWithSamplesFastPath(
|
||||
ctx context.Context,
|
||||
req *metricsexplorertypes.StatsRequest,
|
||||
normalized bool,
|
||||
orderBy *qbtypes.OrderBy,
|
||||
) ([]metricsexplorertypes.Stat, uint64, error) {
|
||||
ctx = m.withMetricsExplorerContext(ctx, "fetchMetricsStatsWithSamplesFastPath")
|
||||
|
||||
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
// Timeseries counts per metric
|
||||
tsSB := sqlbuilder.NewSelectBuilder()
|
||||
tsSB.Select(
|
||||
"metric_name",
|
||||
"uniq(fingerprint) AS timeseries",
|
||||
)
|
||||
tsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
tsSB.Where(tsSB.Between("unix_milli", start, end))
|
||||
tsSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
tsSB.Where(tsSB.E("__normalized", normalized))
|
||||
tsSB.GroupBy("metric_name")
|
||||
|
||||
// Distinct metric_names from local TS table — narrows samples scan on its leading sort key
|
||||
metricNamesSB := sqlbuilder.NewSelectBuilder()
|
||||
metricNamesSB.Select("DISTINCT metric_name")
|
||||
metricNamesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTsTable))
|
||||
metricNamesSB.Where(metricNamesSB.Between("unix_milli", start, end))
|
||||
metricNamesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
metricNamesSB.Where(metricNamesSB.E("__normalized", normalized))
|
||||
|
||||
// Samples counts per metric
|
||||
samplesSB := sqlbuilder.NewSelectBuilder()
|
||||
samplesSB.Select(
|
||||
"metric_name",
|
||||
fmt.Sprintf("%s AS samples", countExp),
|
||||
)
|
||||
samplesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
samplesSB.Where(samplesSB.Between("unix_milli", req.Start, req.End))
|
||||
samplesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
samplesSB.Where(fmt.Sprintf("metric_name IN (%s)", samplesSB.Var(metricNamesSB)))
|
||||
samplesSB.GroupBy("metric_name")
|
||||
|
||||
cteBuilder := sqlbuilder.With(
|
||||
sqlbuilder.CTEQuery("__time_series_counts").As(tsSB),
|
||||
sqlbuilder.CTEQuery("__sample_counts").As(samplesSB),
|
||||
)
|
||||
|
||||
finalSB := cteBuilder.Select(
|
||||
"COALESCE(ts.metric_name, s.metric_name) AS metric_name",
|
||||
"COALESCE(ts.timeseries, 0) AS timeseries",
|
||||
"COALESCE(s.samples, 0) AS samples",
|
||||
"COUNT(*) OVER() AS total",
|
||||
)
|
||||
finalSB.From("__time_series_counts ts")
|
||||
finalSB.JoinWithOption(sqlbuilder.FullOuterJoin, "__sample_counts s", "ts.metric_name = s.metric_name")
|
||||
finalSB.Where("(COALESCE(ts.timeseries, 0) > 0 OR COALESCE(s.samples, 0) > 0)")
|
||||
|
||||
orderByColumn, orderDirection, err := getStatsOrderByColumn(orderBy)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
finalSB.OrderBy(
|
||||
fmt.Sprintf("%s %s", orderByColumn, strings.ToUpper(orderDirection)),
|
||||
"metric_name ASC",
|
||||
)
|
||||
finalSB.Limit(req.Limit)
|
||||
finalSB.Offset(req.Offset)
|
||||
|
||||
query, args := finalSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
valueCtx := ctxtypes.SetClickhouseMaxThreads(ctx, m.config.TelemetryStore.Threads)
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(valueCtx, query, args...)
|
||||
if err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to execute metrics stats with samples fastpath query")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
metricStats := make([]metricsexplorertypes.Stat, 0)
|
||||
var total uint64
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
metricStat metricsexplorertypes.Stat
|
||||
rowTotal uint64
|
||||
)
|
||||
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal); err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metrics stats row")
|
||||
}
|
||||
metricStats = append(metricStats, metricStat)
|
||||
total = rowTotal
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metrics stats rows")
|
||||
}
|
||||
|
||||
return metricStats, total, nil
|
||||
}
|
||||
|
||||
func (m *module) computeTimeseriesTreemap(ctx context.Context, req *metricsexplorertypes.TreemapRequest, filterWhereClause *sqlbuilder.WhereClause) ([]metricsexplorertypes.TreemapEntry, error) {
|
||||
ctx = m.withMetricsExplorerContext(ctx, "computeTimeseriesTreemap")
|
||||
|
||||
@@ -1253,6 +1369,84 @@ func (m *module) computeSamplesTreemap(ctx context.Context, req *metricsexplorer
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func (m *module) computeSamplesTreemapFastPath(ctx context.Context, req *metricsexplorertypes.TreemapRequest) ([]metricsexplorertypes.TreemapEntry, error) {
|
||||
|
||||
ctx = m.withMetricsExplorerContext(ctx, "computeSamplesTreemapFastPath")
|
||||
|
||||
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
candidateLimit := req.Limit + 50
|
||||
|
||||
metricCandidatesSB := sqlbuilder.NewSelectBuilder()
|
||||
metricCandidatesSB.Select("metric_name")
|
||||
metricCandidatesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
metricCandidatesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
metricCandidatesSB.Where(metricCandidatesSB.E("__normalized", false))
|
||||
metricCandidatesSB.Where(metricCandidatesSB.Between("unix_milli", start, end))
|
||||
metricCandidatesSB.GroupBy("metric_name")
|
||||
metricCandidatesSB.OrderBy("uniq(fingerprint) DESC")
|
||||
metricCandidatesSB.Limit(candidateLimit)
|
||||
|
||||
totalSamplesSB := sqlbuilder.NewSelectBuilder()
|
||||
totalSamplesSB.Select(fmt.Sprintf("%s AS total_samples", countExp))
|
||||
totalSamplesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
totalSamplesSB.Where(totalSamplesSB.Between("unix_milli", req.Start, req.End))
|
||||
|
||||
sampleCountsSB := sqlbuilder.NewSelectBuilder()
|
||||
sampleCountsSB.Select(
|
||||
"metric_name",
|
||||
fmt.Sprintf("%s AS samples", countExp),
|
||||
)
|
||||
sampleCountsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
sampleCountsSB.Where(sampleCountsSB.Between("unix_milli", req.Start, req.End))
|
||||
sampleCountsSB.Where("metric_name GLOBAL IN (SELECT metric_name FROM __metric_candidates)")
|
||||
sampleCountsSB.GroupBy("metric_name")
|
||||
|
||||
cteBuilder := sqlbuilder.With(
|
||||
sqlbuilder.CTEQuery("__metric_candidates").As(metricCandidatesSB),
|
||||
sqlbuilder.CTEQuery("__sample_counts").As(sampleCountsSB),
|
||||
sqlbuilder.CTEQuery("__total_samples").As(totalSamplesSB),
|
||||
)
|
||||
|
||||
finalSB := cteBuilder.Select(
|
||||
"mc.metric_name",
|
||||
"COALESCE(sc.samples, 0) AS samples",
|
||||
"CASE WHEN ts.total_samples = 0 THEN 0 ELSE (COALESCE(sc.samples, 0) * 100.0 / ts.total_samples) END AS percentage",
|
||||
)
|
||||
finalSB.From("__metric_candidates mc")
|
||||
finalSB.JoinWithOption(sqlbuilder.LeftJoin, "__sample_counts sc", "mc.metric_name = sc.metric_name")
|
||||
finalSB.Join("__total_samples ts", "1=1")
|
||||
finalSB.OrderBy("percentage DESC")
|
||||
finalSB.Limit(req.Limit)
|
||||
|
||||
query, args := finalSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
valueCtx := ctxtypes.SetClickhouseMaxThreads(ctx, m.config.TelemetryStore.Threads)
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(valueCtx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to execute samples treemap fastpath query")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
entries := make([]metricsexplorertypes.TreemapEntry, 0)
|
||||
for rows.Next() {
|
||||
var treemapEntry metricsexplorertypes.TreemapEntry
|
||||
if err := rows.Scan(&treemapEntry.MetricName, &treemapEntry.TotalValue, &treemapEntry.Percentage); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan samples treemap row")
|
||||
}
|
||||
entries = append(entries, treemapEntry)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating samples treemap rows")
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// getMetricDataPoints returns the total number of data points (samples) for a metric.
|
||||
func (m *module) getMetricDataPoints(ctx context.Context, metricName string) (uint64, error) {
|
||||
ctx = m.withMetricsExplorerContext(ctx, "getMetricDataPoints")
|
||||
|
||||
Reference in New Issue
Block a user