mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-26 20:00:33 +01:00
Compare commits
15 Commits
fix/tansta
...
infraM/pro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fbbe09400f | ||
|
|
44db9e202a | ||
|
|
f9e704caa4 | ||
|
|
21ac6505e9 | ||
|
|
5cadf8582e | ||
|
|
711bbf49f1 | ||
|
|
1cc48405e0 | ||
|
|
e79715bf6d | ||
|
|
2ce86a3f9d | ||
|
|
639e3239a9 | ||
|
|
e83a23e6ab | ||
|
|
f0cff4ae96 | ||
|
|
0667dc47cb | ||
|
|
4af267ee61 | ||
|
|
6f1090818c |
@@ -1,4 +1,4 @@
|
||||
import { forwardRef, type HTMLAttributes, type ReactNode } from 'react';
|
||||
import type { HTMLAttributes, ReactNode } from 'react';
|
||||
import cx from 'classnames';
|
||||
|
||||
import tableStyles from './TanStackTable.module.scss';
|
||||
@@ -22,19 +22,21 @@ type WithDangerousHtml = BaseProps & {
|
||||
|
||||
export type TanStackTableTextProps = WithChildren | WithDangerousHtml;
|
||||
|
||||
const TanStackTableText = forwardRef<HTMLSpanElement, TanStackTableTextProps>(
|
||||
({ children, className, dangerouslySetInnerHTML, ...rest }, ref) => (
|
||||
function TanStackTableText({
|
||||
children,
|
||||
className,
|
||||
dangerouslySetInnerHTML,
|
||||
...rest
|
||||
}: TanStackTableTextProps): JSX.Element {
|
||||
return (
|
||||
<span
|
||||
ref={ref}
|
||||
className={cx(tableStyles.tableCellText, className)}
|
||||
dangerouslySetInnerHTML={dangerouslySetInnerHTML}
|
||||
{...rest}
|
||||
>
|
||||
{children}
|
||||
</span>
|
||||
),
|
||||
);
|
||||
|
||||
TanStackTableText.displayName = 'TanStackTableText';
|
||||
);
|
||||
}
|
||||
|
||||
export default TanStackTableText;
|
||||
|
||||
@@ -149,6 +149,11 @@ func paginateWithBackfill(
|
||||
groupBy []qbtypes.GroupByKey,
|
||||
offset, limit int,
|
||||
) []map[string]string {
|
||||
|
||||
// note: we took a stand here that we are NOT removing those metricGroups from the array that are not in metadataMap.
|
||||
// we are relying on time adjustment logic from alignedMetricWindow. In future if a user complains about seeing metric groups
|
||||
// with missing metadata, we can consider removing those groups from the metricGroups array here before paginating.
|
||||
|
||||
metricKeySet := make(map[string]bool, len(metricGroups))
|
||||
for _, g := range metricGroups {
|
||||
metricKeySet[g.compositeKey] = true
|
||||
@@ -163,7 +168,7 @@ func paginateWithBackfill(
|
||||
sort.Strings(metadataOnlyKeys)
|
||||
|
||||
totalMetric := len(metricGroups)
|
||||
totalAll := totalMetric + len(metadataOnlyKeys)
|
||||
totalAll := len(metadataMap)
|
||||
|
||||
end := offset + limit
|
||||
if end > totalAll {
|
||||
@@ -307,23 +312,57 @@ func parseFullQueryResponse(
|
||||
return result
|
||||
}
|
||||
|
||||
// buildSamplesTblFingerprintSubQuery returns a SelectBuilder that selects distinct fingerprints
|
||||
// from the samples table for the given metric names andtime range.
|
||||
func (m *module) buildSamplesTblFingerprintSubQuery(metricNames []string, startMs, endMs int64) *sqlbuilder.SelectBuilder {
|
||||
samplesTableName := telemetrymetrics.WhichSamplesTableToUse(
|
||||
uint64(startMs), uint64(endMs),
|
||||
metrictypes.UnspecifiedType,
|
||||
metrictypes.TimeAggregationUnspecified,
|
||||
nil,
|
||||
// alignedMetricWindow returns step-floored time bounds and the metric tables
|
||||
// to use for the given window. The floor matches what the QB v5 metric
|
||||
// querier does internally (see querybuilder.AdjustedMetricTimeRange).
|
||||
// Please use the samplesAdjustedStartMs with samples table and tsAdjustedStartMs with ts tables.
|
||||
// Both can use the same flooredEndMs.
|
||||
func alignedMetricWindow(startMs, endMs int64) (
|
||||
uint64, // samplesAdjustedStartMs
|
||||
uint64, // flooredEndMs
|
||||
uint64, // tsAdjustedStartMs
|
||||
string, // distributedTSTable
|
||||
string, // localTSTable
|
||||
string, // distributedSamplesTable
|
||||
string, // localSamplesTable
|
||||
) {
|
||||
samplesAdjustedStartMs := uint64(startMs)
|
||||
flooredEndMs := uint64(endMs)
|
||||
stepSecs := querybuilder.RecommendedStepIntervalForMetric(samplesAdjustedStartMs, flooredEndMs)
|
||||
// note: this is the same flooring logic as in querybuilder.AdjustedMetricTimeRange. Duplicated code.
|
||||
// TODO(nikhilmantri0902): if the querybuilder.AdjustMetricTimeRange logic changes, this needs to be updated too.
|
||||
if stepSecs > 0 {
|
||||
samplesAdjustedStartMs = samplesAdjustedStartMs - (samplesAdjustedStartMs % (stepSecs * 1000))
|
||||
adjustStep := stepSecs
|
||||
if adjustStep > 60 {
|
||||
adjustStep = 60
|
||||
}
|
||||
flooredEndMs = flooredEndMs - (flooredEndMs % (adjustStep * 1000))
|
||||
}
|
||||
|
||||
tsAdjustedStartMs, _, distributedTSTable, localTSTable := telemetrymetrics.WhichTSTableToUse(
|
||||
samplesAdjustedStartMs, flooredEndMs, nil,
|
||||
)
|
||||
localSamplesTable := strings.TrimPrefix(samplesTableName, "distributed_")
|
||||
|
||||
distributedSamplesTable, localSamplesTable := telemetrymetrics.WhichSamplesTableToUse(
|
||||
samplesAdjustedStartMs, flooredEndMs,
|
||||
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
|
||||
)
|
||||
|
||||
return samplesAdjustedStartMs, flooredEndMs, tsAdjustedStartMs, distributedTSTable, localTSTable, distributedSamplesTable, localSamplesTable
|
||||
}
|
||||
|
||||
// buildSamplesTblFingerprintSubQuery returns a SelectBuilder that selects distinct fingerprints
|
||||
// from the samples table for the given metric names and time range.
|
||||
// Bounds must already be step-floored by the caller via alignedMetricWindow.
|
||||
func (m *module) buildSamplesTblFingerprintSubQuery(metricNames []string, samplesTable string, flooredStart, flooredEnd uint64) *sqlbuilder.SelectBuilder {
|
||||
fpSB := sqlbuilder.NewSelectBuilder()
|
||||
fpSB.Select("DISTINCT fingerprint")
|
||||
fpSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localSamplesTable))
|
||||
fpSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
fpSB.Where(
|
||||
fpSB.In("metric_name", sqlbuilder.List(metricNames)),
|
||||
fpSB.GE("unix_milli", startMs),
|
||||
fpSB.L("unix_milli", endMs),
|
||||
fpSB.GE("unix_milli", flooredStart),
|
||||
fpSB.L("unix_milli", flooredEnd),
|
||||
)
|
||||
return fpSB
|
||||
}
|
||||
@@ -454,12 +493,12 @@ func (m *module) getMetadata(
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "groupBy must not be empty")
|
||||
}
|
||||
|
||||
// Pick the optimal timeseries table based on time range; also get adjusted start.
|
||||
adjustedStart, adjustedEnd, distributedTableName, _ := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(startMs), uint64(endMs), nil,
|
||||
)
|
||||
// Step-floor the window and pick the right tables — matches the bounds the
|
||||
// QB v5 metric querier uses, so metadataMap covers the same universe the
|
||||
// ranking sees (see alignedMetricWindow doc).
|
||||
samplesStartMs, flooredEndMs, tsAdjustedStartMs, distributedTableName, _, _, localSamplesTable := alignedMetricWindow(startMs, endMs)
|
||||
|
||||
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, startMs, endMs)
|
||||
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, localSamplesTable, samplesStartMs, flooredEndMs)
|
||||
|
||||
// Flatten groupBy keys to string names for SQL expressions and result scanning.
|
||||
groupByCols := make([]string, len(groupBy))
|
||||
@@ -494,8 +533,8 @@ func (m *module) getMetadata(
|
||||
innerSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTableName))
|
||||
innerSB.Where(
|
||||
innerSB.In("metric_name", sqlbuilder.List(metricNames)),
|
||||
innerSB.GE("unix_milli", adjustedStart),
|
||||
innerSB.L("unix_milli", adjustedEnd),
|
||||
innerSB.GE("unix_milli", tsAdjustedStartMs),
|
||||
innerSB.LE("unix_milli", flooredEndMs),
|
||||
fmt.Sprintf("fingerprint IN (%s)", innerSB.Var(fpSB)),
|
||||
)
|
||||
|
||||
|
||||
@@ -34,9 +34,8 @@ func (m *module) getPerGroupHostStatusCounts(
|
||||
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
|
||||
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
|
||||
|
||||
adjustedStart, adjustedEnd, distributedTimeSeriesTableName, _ := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(req.Start), uint64(req.End), nil,
|
||||
)
|
||||
// Step-floor bounds + resolve tables in one shot to match QB v5 querier.
|
||||
samplesStartMs, flooredEndMs, tsAdjustedStartMs, distributedTimeSeriesTableName, _, _, localSamplesTable := alignedMetricWindow(req.Start, req.End)
|
||||
|
||||
hostNameExpr := fmt.Sprintf("JSONExtractString(labels, '%s')", inframonitoringtypes.HostNameAttrKey)
|
||||
|
||||
@@ -55,15 +54,15 @@ func (m *module) getPerGroupHostStatusCounts(
|
||||
)
|
||||
|
||||
// Build a fingerprint subquery to restrict to fingerprints with actual sample
|
||||
// data in the original time range (not the wider timeseries table window).
|
||||
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, req.Start, req.End)
|
||||
// data in the floored time range.
|
||||
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, localSamplesTable, samplesStartMs, flooredEndMs)
|
||||
|
||||
sb.Select(selectCols...)
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTimeSeriesTableName))
|
||||
sb.Where(
|
||||
sb.In("metric_name", sqlbuilder.List(metricNames)),
|
||||
sb.GE("unix_milli", adjustedStart),
|
||||
sb.L("unix_milli", adjustedEnd),
|
||||
sb.GE("unix_milli", tsAdjustedStartMs),
|
||||
sb.LE("unix_milli", flooredEndMs),
|
||||
fmt.Sprintf("fingerprint IN (%s)", sb.Var(fpSB)),
|
||||
)
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -190,15 +189,9 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
|
||||
mergedFilterExpr := mergeFilterExpressions(userFilterExpr, pageGroupsFilterExpr)
|
||||
|
||||
// Resolve tables. Same convention as pods.
|
||||
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(start), uint64(end), nil,
|
||||
)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
|
||||
uint64(start), uint64(end),
|
||||
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
|
||||
)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
|
||||
// Step-floor bounds + resolve tables in one shot to match QB v5 querier.
|
||||
samplesStartMs, flooredEndMs, tsAdjustedStartMs, _, localTimeSeriesTable, distributedSamplesTable, _ := alignedMetricWindow(start, end)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(distributedSamplesTable)
|
||||
|
||||
// ----- timeSeriesFPs -----
|
||||
timeSeriesFPs := sqlbuilder.NewSelectBuilder()
|
||||
@@ -215,8 +208,8 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
timeSeriesFPs.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTimeSeriesTable))
|
||||
timeSeriesFPs.Where(
|
||||
timeSeriesFPs.E("metric_name", nodeConditionMetricName),
|
||||
timeSeriesFPs.GE("unix_milli", adjustedStart),
|
||||
timeSeriesFPs.L("unix_milli", adjustedEnd),
|
||||
timeSeriesFPs.GE("unix_milli", tsAdjustedStartMs),
|
||||
timeSeriesFPs.LE("unix_milli", flooredEndMs),
|
||||
)
|
||||
if mergedFilterExpr != "" {
|
||||
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: mergedFilterExpr}, start, end)
|
||||
@@ -249,12 +242,12 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
latestConditionPerNode.Select(latestConditionPerNodeSelectCols...)
|
||||
latestConditionPerNode.From(fmt.Sprintf(
|
||||
"%s.%s AS samples INNER JOIN time_series_fps AS tsfp ON samples.fingerprint = tsfp.fingerprint",
|
||||
telemetrymetrics.DBName, samplesTable,
|
||||
telemetrymetrics.DBName, distributedSamplesTable,
|
||||
))
|
||||
latestConditionPerNode.Where(
|
||||
latestConditionPerNode.E("samples.metric_name", nodeConditionMetricName),
|
||||
latestConditionPerNode.GE("samples.unix_milli", start),
|
||||
latestConditionPerNode.L("samples.unix_milli", end),
|
||||
latestConditionPerNode.GE("samples.unix_milli", samplesStartMs),
|
||||
latestConditionPerNode.L("samples.unix_milli", flooredEndMs),
|
||||
"tsfp.node_name != ''",
|
||||
)
|
||||
latestConditionPerNode.GroupBy(latestConditionPerNodeGroupBy...)
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -96,14 +95,16 @@ func buildPodRecords(
|
||||
}
|
||||
}
|
||||
|
||||
if attrs, ok := metadataMap[compositeKey]; ok && isPodUIDInGroupBy {
|
||||
// the condition above ensures we deduce age only if pod uid is in group by because if
|
||||
// it's not in group by then we might have multiple pod uids in the same group and hence then podAge wont make sense
|
||||
if startTimeStr, exists := attrs[podStartTimeAttrKey]; exists && startTimeStr != "" {
|
||||
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
|
||||
startTimeMs := t.UnixMilli()
|
||||
if startTimeMs > 0 {
|
||||
record.PodAge = reqEnd - startTimeMs
|
||||
if attrs, ok := metadataMap[compositeKey]; ok {
|
||||
// podAge only makes sense when pod uid is in groupBy. Otherwise the
|
||||
// group can contain multiple pods with different start times.
|
||||
if isPodUIDInGroupBy {
|
||||
if startTimeStr, exists := attrs[podStartTimeAttrKey]; exists && startTimeStr != "" {
|
||||
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
|
||||
startTimeMs := t.UnixMilli()
|
||||
if startTimeMs > 0 {
|
||||
record.PodAge = reqEnd - startTimeMs
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,15 +210,9 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
|
||||
mergedFilterExpr := mergeFilterExpressions(userFilterExpr, pageGroupsFilterExpr)
|
||||
|
||||
// Resolve tables. Same convention as hosts (distributed names from helpers).
|
||||
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(start), uint64(end), nil,
|
||||
)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
|
||||
uint64(start), uint64(end),
|
||||
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
|
||||
)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
|
||||
// Step-floor bounds + resolve tables in one shot to match QB v5 querier.
|
||||
samplesStartMs, flooredEndMs, tsAdjustedStart, _, localTimeSeriesTable, distributedSamplesTable, _ := alignedMetricWindow(start, end)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(distributedSamplesTable)
|
||||
|
||||
// ----- timeSeriesFPs -----
|
||||
timeSeriesFPs := sqlbuilder.NewSelectBuilder()
|
||||
@@ -234,8 +229,8 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
timeSeriesFPs.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTimeSeriesTable))
|
||||
timeSeriesFPs.Where(
|
||||
timeSeriesFPs.E("metric_name", podPhaseMetricName),
|
||||
timeSeriesFPs.GE("unix_milli", adjustedStart),
|
||||
timeSeriesFPs.L("unix_milli", adjustedEnd),
|
||||
timeSeriesFPs.GE("unix_milli", tsAdjustedStart),
|
||||
timeSeriesFPs.LE("unix_milli", flooredEndMs),
|
||||
)
|
||||
if mergedFilterExpr != "" {
|
||||
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: mergedFilterExpr}, start, end)
|
||||
@@ -267,12 +262,12 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
latestPhasePerPod.Select(latestPhasePerPodSelectCols...)
|
||||
latestPhasePerPod.From(fmt.Sprintf(
|
||||
"%s.%s AS samples INNER JOIN time_series_fps AS tsfp ON samples.fingerprint = tsfp.fingerprint",
|
||||
telemetrymetrics.DBName, samplesTable,
|
||||
telemetrymetrics.DBName, distributedSamplesTable,
|
||||
))
|
||||
latestPhasePerPod.Where(
|
||||
latestPhasePerPod.E("samples.metric_name", podPhaseMetricName),
|
||||
latestPhasePerPod.GE("samples.unix_milli", start),
|
||||
latestPhasePerPod.L("samples.unix_milli", end),
|
||||
latestPhasePerPod.GE("samples.unix_milli", samplesStartMs),
|
||||
latestPhasePerPod.L("samples.unix_milli", flooredEndMs),
|
||||
"tsfp.pod_uid != ''",
|
||||
)
|
||||
latestPhasePerPod.GroupBy(latestPhasePerPodGroupBy...)
|
||||
|
||||
@@ -981,7 +981,7 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
ctx = m.withMetricsExplorerContext(ctx, "fetchMetricsStatsWithSamples")
|
||||
|
||||
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
samplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
// Timeseries counts per metric
|
||||
@@ -1155,7 +1155,7 @@ func (m *module) computeSamplesTreemap(ctx context.Context, req *metricsexplorer
|
||||
ctx = m.withMetricsExplorerContext(ctx, "computeSamplesTreemap")
|
||||
|
||||
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
samplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
candidateLimit := req.Limit + 50
|
||||
|
||||
@@ -240,7 +240,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
||||
|
||||
sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
sb.Where(
|
||||
@@ -369,7 +369,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
|
||||
|
||||
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
sb.Where(
|
||||
@@ -410,7 +410,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
}
|
||||
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
baseSb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
baseSb.Where(
|
||||
@@ -501,7 +501,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
|
||||
sb.SelectMore(expr)
|
||||
}
|
||||
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||
sb.Where(
|
||||
|
||||
@@ -135,44 +135,56 @@ func ValueColumnForSamplesTable(tableName string) string {
|
||||
return "value"
|
||||
}
|
||||
|
||||
// start and end are in milliseconds
|
||||
// we have three tables for samples
|
||||
// 1. distributed_samples_v4
|
||||
// 2. distributed_samples_v4_agg_5m - for queries with time range above or equal to 1 day and less than 1 week
|
||||
// 3. distributed_samples_v4_agg_30m - for queries with time range above or equal to 1 week
|
||||
// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it.
|
||||
// WhichSamplesTableToUse returns the distributed and local samples table names
|
||||
// (in that order) appropriate for the given window, metric type, and time aggregation.
|
||||
//
|
||||
// start and end are in milliseconds. We have three tables for samples:
|
||||
// 1. distributed_samples_v4
|
||||
// 2. distributed_samples_v4_agg_5m — for queries with time range >= 1 day and < 1 week
|
||||
// 3. distributed_samples_v4_agg_30m — for queries with time range >= 1 week
|
||||
//
|
||||
// If the `timeAggregation` is `count_distinct` we can't use the aggregated tables
|
||||
// because they don't support it.
|
||||
func WhichSamplesTableToUse(
|
||||
start, end uint64,
|
||||
metricType metrictypes.Type,
|
||||
timeAggregation metrictypes.TimeAggregation,
|
||||
tableHints *metrictypes.MetricTableHints,
|
||||
) string {
|
||||
) (string, string) {
|
||||
// if we have a hint for the table, we need to use it
|
||||
// the hint will be used to override the default table selection logic
|
||||
if tableHints != nil {
|
||||
if tableHints.SamplesTableName != "" {
|
||||
return tableHints.SamplesTableName
|
||||
// the hint will be used to override the default table selection logic.
|
||||
// SamplesTableName is the distributed name; derive the local via switch.
|
||||
if tableHints != nil && tableHints.SamplesTableName != "" {
|
||||
switch tableHints.SamplesTableName {
|
||||
case SamplesV4TableName:
|
||||
return SamplesV4TableName, SamplesV4LocalTableName
|
||||
case SamplesV4Agg5mTableName:
|
||||
return SamplesV4Agg5mTableName, SamplesV4Agg5mLocalTableName
|
||||
case SamplesV4Agg30mTableName:
|
||||
return SamplesV4Agg30mTableName, SamplesV4Agg30mLocalTableName
|
||||
case ExpHistogramTableName:
|
||||
return ExpHistogramTableName, ExpHistogramLocalTableName
|
||||
}
|
||||
return tableHints.SamplesTableName, tableHints.SamplesTableName
|
||||
}
|
||||
|
||||
// we don't have any aggregated table for sketches (yet)
|
||||
if metricType == metrictypes.ExpHistogramType {
|
||||
return ExpHistogramLocalTableName
|
||||
return ExpHistogramTableName, ExpHistogramLocalTableName
|
||||
}
|
||||
|
||||
// if the time aggregation is count_distinct, we need to use the distributed_samples_v4 table
|
||||
// because the aggregated tables don't support count_distinct
|
||||
if timeAggregation == metrictypes.TimeAggregationCountDistinct {
|
||||
return SamplesV4TableName
|
||||
return SamplesV4TableName, SamplesV4LocalTableName
|
||||
}
|
||||
|
||||
if end-start < oneDayInMilliseconds+offsetBucket {
|
||||
return SamplesV4TableName
|
||||
return SamplesV4TableName, SamplesV4LocalTableName
|
||||
} else if end-start < oneWeekInMilliseconds+offsetBucket {
|
||||
return SamplesV4Agg5mTableName
|
||||
} else {
|
||||
return SamplesV4Agg30mTableName
|
||||
return SamplesV4Agg5mTableName, SamplesV4Agg5mLocalTableName
|
||||
}
|
||||
return SamplesV4Agg30mTableName, SamplesV4Agg30mLocalTableName
|
||||
}
|
||||
|
||||
func AggregationColumnForSamplesTable(
|
||||
@@ -182,7 +194,7 @@ func AggregationColumnForSamplesTable(
|
||||
timeAggregation metrictypes.TimeAggregation,
|
||||
tableHints *metrictypes.MetricTableHints,
|
||||
) (string, error) {
|
||||
tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
|
||||
tableName, _ := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
|
||||
var aggregationColumn string
|
||||
switch temporality {
|
||||
case metrictypes.Delta:
|
||||
|
||||
@@ -257,7 +257,10 @@ func (s SpaceAggregation) Percentile() float64 {
|
||||
}
|
||||
|
||||
// MetricTableHints is a struct that contains tables to use instead of the derived tables
|
||||
// from the start and end time, for internal use only when we need to override the derived tables.
|
||||
//
|
||||
// Convention :
|
||||
// - TimeSeriesTableName: the LOCAL table name (e.g. "time_series_v4_1day").
|
||||
// - SamplesTableName: the DISTRIBUTED table name (e.g. "distributed_samples_v4_agg_5m").
|
||||
type MetricTableHints struct {
|
||||
TimeSeriesTableName string
|
||||
SamplesTableName string
|
||||
|
||||
Reference in New Issue
Block a user