mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-26 03:40:33 +01:00
Compare commits
13 Commits
fix/ext-ap
...
infraM/pro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9e704caa4 | ||
|
|
21ac6505e9 | ||
|
|
5cadf8582e | ||
|
|
711bbf49f1 | ||
|
|
1cc48405e0 | ||
|
|
e79715bf6d | ||
|
|
2ce86a3f9d | ||
|
|
639e3239a9 | ||
|
|
e83a23e6ab | ||
|
|
f0cff4ae96 | ||
|
|
0667dc47cb | ||
|
|
4af267ee61 | ||
|
|
6f1090818c |
@@ -149,6 +149,11 @@ func paginateWithBackfill(
|
||||
groupBy []qbtypes.GroupByKey,
|
||||
offset, limit int,
|
||||
) []map[string]string {
|
||||
|
||||
// note: we took a stand here that we are NOT removing those metricGroups from the array that are not in metadataMap.
|
||||
// we are relying on time adjustment logic from alignedMetricWindow. In future if a user complains about seeing metric groups
|
||||
// with missing metadata, we can consider removing those groups from the metricGroups array here before paginating.
|
||||
|
||||
metricKeySet := make(map[string]bool, len(metricGroups))
|
||||
for _, g := range metricGroups {
|
||||
metricKeySet[g.compositeKey] = true
|
||||
@@ -163,7 +168,7 @@ func paginateWithBackfill(
|
||||
sort.Strings(metadataOnlyKeys)
|
||||
|
||||
totalMetric := len(metricGroups)
|
||||
totalAll := totalMetric + len(metadataOnlyKeys)
|
||||
totalAll := len(metadataMap)
|
||||
|
||||
end := offset + limit
|
||||
if end > totalAll {
|
||||
@@ -307,23 +312,58 @@ func parseFullQueryResponse(
|
||||
return result
|
||||
}
|
||||
|
||||
// buildSamplesTblFingerprintSubQuery returns a SelectBuilder that selects distinct fingerprints
|
||||
// from the samples table for the given metric names andtime range.
|
||||
func (m *module) buildSamplesTblFingerprintSubQuery(metricNames []string, startMs, endMs int64) *sqlbuilder.SelectBuilder {
|
||||
samplesTableName := telemetrymetrics.WhichSamplesTableToUse(
|
||||
uint64(startMs), uint64(endMs),
|
||||
metrictypes.UnspecifiedType,
|
||||
metrictypes.TimeAggregationUnspecified,
|
||||
nil,
|
||||
// alignedMetricWindow returns step-floored time bounds and the metric tables
|
||||
// to use for the given window. The floor matches what the QB v5 metric
|
||||
// querier does internally (see querybuilder.AdjustedMetricTimeRange).
|
||||
// Please use the samplesAdjustedStartMs with samples table and tsAdjustedStartMs with ts tables.
|
||||
// Both can use the same flooredEndMs.
|
||||
func alignedMetricWindow(startMs, endMs int64) (
|
||||
samplesAdjustedStartMs uint64,
|
||||
flooredEndMs uint64,
|
||||
tsAdjustedStartMs uint64,
|
||||
distributedTSTable string,
|
||||
localTSTable string,
|
||||
distributedSamplesTable string,
|
||||
localSamplesTable string,
|
||||
) {
|
||||
samplesAdjustedStartMs = uint64(startMs)
|
||||
flooredEndMs = uint64(endMs)
|
||||
stepSecs := querybuilder.RecommendedStepIntervalForMetric(samplesAdjustedStartMs, flooredEndMs)
|
||||
// note: this is the same flooring logic as in querybuilder.AdjustedMetricTimeRange. Duplicated code.
|
||||
// TODO(nikhilmantri0902): if the querybuilder.AdjustMetricTimeRange logic changes, this needs to be updated too.
|
||||
if stepSecs > 0 {
|
||||
samplesAdjustedStartMs = samplesAdjustedStartMs - (samplesAdjustedStartMs % (stepSecs * 1000))
|
||||
adjustStep := stepSecs
|
||||
if adjustStep > 60 {
|
||||
adjustStep = 60
|
||||
}
|
||||
flooredEndMs = flooredEndMs - (flooredEndMs % (adjustStep * 1000))
|
||||
}
|
||||
|
||||
tsAdjustedStartMs, _, distributedTSTable, localTSTable = telemetrymetrics.WhichTSTableToUse(
|
||||
samplesAdjustedStartMs, flooredEndMs, nil,
|
||||
)
|
||||
localSamplesTable := strings.TrimPrefix(samplesTableName, "distributed_")
|
||||
|
||||
distributedSamplesTable = telemetrymetrics.WhichSamplesTableToUse(
|
||||
samplesAdjustedStartMs, flooredEndMs,
|
||||
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
|
||||
)
|
||||
localSamplesTable = strings.TrimPrefix(distributedSamplesTable, "distributed_")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// buildSamplesTblFingerprintSubQuery returns a SelectBuilder that selects distinct fingerprints
|
||||
// from the samples table for the given metric names and time range.
|
||||
// Bounds must already be step-floored by the caller via alignedMetricWindow.
|
||||
func (m *module) buildSamplesTblFingerprintSubQuery(metricNames []string, samplesTable string, flooredStart, flooredEnd uint64) *sqlbuilder.SelectBuilder {
|
||||
fpSB := sqlbuilder.NewSelectBuilder()
|
||||
fpSB.Select("DISTINCT fingerprint")
|
||||
fpSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localSamplesTable))
|
||||
fpSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
fpSB.Where(
|
||||
fpSB.In("metric_name", sqlbuilder.List(metricNames)),
|
||||
fpSB.GE("unix_milli", startMs),
|
||||
fpSB.L("unix_milli", endMs),
|
||||
fpSB.GE("unix_milli", flooredStart),
|
||||
fpSB.L("unix_milli", flooredEnd),
|
||||
)
|
||||
return fpSB
|
||||
}
|
||||
@@ -454,12 +494,12 @@ func (m *module) getMetadata(
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "groupBy must not be empty")
|
||||
}
|
||||
|
||||
// Pick the optimal timeseries table based on time range; also get adjusted start.
|
||||
adjustedStart, adjustedEnd, distributedTableName, _ := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(startMs), uint64(endMs), nil,
|
||||
)
|
||||
// Step-floor the window and pick the right tables — matches the bounds the
|
||||
// QB v5 metric querier uses, so metadataMap covers the same universe the
|
||||
// ranking sees (see alignedMetricWindow doc).
|
||||
samplesStartMs, flooredEndMs, tsAdjustedStartMs, distributedTableName, _, _, localSamplesTable := alignedMetricWindow(startMs, endMs)
|
||||
|
||||
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, startMs, endMs)
|
||||
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, localSamplesTable, samplesStartMs, flooredEndMs)
|
||||
|
||||
// Flatten groupBy keys to string names for SQL expressions and result scanning.
|
||||
groupByCols := make([]string, len(groupBy))
|
||||
@@ -494,8 +534,8 @@ func (m *module) getMetadata(
|
||||
innerSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTableName))
|
||||
innerSB.Where(
|
||||
innerSB.In("metric_name", sqlbuilder.List(metricNames)),
|
||||
innerSB.GE("unix_milli", adjustedStart),
|
||||
innerSB.L("unix_milli", adjustedEnd),
|
||||
innerSB.GE("unix_milli", tsAdjustedStartMs),
|
||||
innerSB.LE("unix_milli", flooredEndMs),
|
||||
fmt.Sprintf("fingerprint IN (%s)", innerSB.Var(fpSB)),
|
||||
)
|
||||
|
||||
|
||||
@@ -34,9 +34,8 @@ func (m *module) getPerGroupHostStatusCounts(
|
||||
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
|
||||
filterExpr := mergeFilterExpressions(reqFilterExpr, pageGroupsFilterExpr)
|
||||
|
||||
adjustedStart, adjustedEnd, distributedTimeSeriesTableName, _ := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(req.Start), uint64(req.End), nil,
|
||||
)
|
||||
// Step-floor bounds + resolve tables in one shot to match QB v5 querier.
|
||||
samplesStartMs, flooredEndMs, tsAdjustedStartMs, distributedTimeSeriesTableName, _, _, localSamplesTable := alignedMetricWindow(req.Start, req.End)
|
||||
|
||||
hostNameExpr := fmt.Sprintf("JSONExtractString(labels, '%s')", inframonitoringtypes.HostNameAttrKey)
|
||||
|
||||
@@ -55,15 +54,15 @@ func (m *module) getPerGroupHostStatusCounts(
|
||||
)
|
||||
|
||||
// Build a fingerprint subquery to restrict to fingerprints with actual sample
|
||||
// data in the original time range (not the wider timeseries table window).
|
||||
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, req.Start, req.End)
|
||||
// data in the floored time range.
|
||||
fpSB := m.buildSamplesTblFingerprintSubQuery(metricNames, localSamplesTable, samplesStartMs, flooredEndMs)
|
||||
|
||||
sb.Select(selectCols...)
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTimeSeriesTableName))
|
||||
sb.Where(
|
||||
sb.In("metric_name", sqlbuilder.List(metricNames)),
|
||||
sb.GE("unix_milli", adjustedStart),
|
||||
sb.L("unix_milli", adjustedEnd),
|
||||
sb.GE("unix_milli", tsAdjustedStartMs),
|
||||
sb.LE("unix_milli", flooredEndMs),
|
||||
fmt.Sprintf("fingerprint IN (%s)", sb.Var(fpSB)),
|
||||
)
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -190,15 +189,9 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
|
||||
mergedFilterExpr := mergeFilterExpressions(userFilterExpr, pageGroupsFilterExpr)
|
||||
|
||||
// Resolve tables. Same convention as pods.
|
||||
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(start), uint64(end), nil,
|
||||
)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
|
||||
uint64(start), uint64(end),
|
||||
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
|
||||
)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
|
||||
// Step-floor bounds + resolve tables in one shot to match QB v5 querier.
|
||||
samplesStartMs, flooredEndMs, tsAdjustedStartMs, _, localTimeSeriesTable, distributedSamplesTable, _ := alignedMetricWindow(start, end)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(distributedSamplesTable)
|
||||
|
||||
// ----- timeSeriesFPs -----
|
||||
timeSeriesFPs := sqlbuilder.NewSelectBuilder()
|
||||
@@ -215,8 +208,8 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
timeSeriesFPs.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTimeSeriesTable))
|
||||
timeSeriesFPs.Where(
|
||||
timeSeriesFPs.E("metric_name", nodeConditionMetricName),
|
||||
timeSeriesFPs.GE("unix_milli", adjustedStart),
|
||||
timeSeriesFPs.L("unix_milli", adjustedEnd),
|
||||
timeSeriesFPs.GE("unix_milli", tsAdjustedStartMs),
|
||||
timeSeriesFPs.LE("unix_milli", flooredEndMs),
|
||||
)
|
||||
if mergedFilterExpr != "" {
|
||||
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: mergedFilterExpr}, start, end)
|
||||
@@ -249,12 +242,12 @@ func (m *module) getPerGroupNodeConditionCounts(
|
||||
latestConditionPerNode.Select(latestConditionPerNodeSelectCols...)
|
||||
latestConditionPerNode.From(fmt.Sprintf(
|
||||
"%s.%s AS samples INNER JOIN time_series_fps AS tsfp ON samples.fingerprint = tsfp.fingerprint",
|
||||
telemetrymetrics.DBName, samplesTable,
|
||||
telemetrymetrics.DBName, distributedSamplesTable,
|
||||
))
|
||||
latestConditionPerNode.Where(
|
||||
latestConditionPerNode.E("samples.metric_name", nodeConditionMetricName),
|
||||
latestConditionPerNode.GE("samples.unix_milli", start),
|
||||
latestConditionPerNode.L("samples.unix_milli", end),
|
||||
latestConditionPerNode.GE("samples.unix_milli", samplesStartMs),
|
||||
latestConditionPerNode.L("samples.unix_milli", flooredEndMs),
|
||||
"tsfp.node_name != ''",
|
||||
)
|
||||
latestConditionPerNode.GroupBy(latestConditionPerNodeGroupBy...)
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/types/inframonitoringtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
@@ -96,14 +95,16 @@ func buildPodRecords(
|
||||
}
|
||||
}
|
||||
|
||||
if attrs, ok := metadataMap[compositeKey]; ok && isPodUIDInGroupBy {
|
||||
// the condition above ensures we deduce age only if pod uid is in group by because if
|
||||
// it's not in group by then we might have multiple pod uids in the same group and hence then podAge wont make sense
|
||||
if startTimeStr, exists := attrs[podStartTimeAttrKey]; exists && startTimeStr != "" {
|
||||
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
|
||||
startTimeMs := t.UnixMilli()
|
||||
if startTimeMs > 0 {
|
||||
record.PodAge = reqEnd - startTimeMs
|
||||
if attrs, ok := metadataMap[compositeKey]; ok {
|
||||
// podAge only makes sense when pod uid is in groupBy. Otherwise the
|
||||
// group can contain multiple pods with different start times.
|
||||
if isPodUIDInGroupBy {
|
||||
if startTimeStr, exists := attrs[podStartTimeAttrKey]; exists && startTimeStr != "" {
|
||||
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
|
||||
startTimeMs := t.UnixMilli()
|
||||
if startTimeMs > 0 {
|
||||
record.PodAge = reqEnd - startTimeMs
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,15 +210,9 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
pageGroupsFilterExpr := buildPageGroupsFilterExpr(pageGroups)
|
||||
mergedFilterExpr := mergeFilterExpressions(userFilterExpr, pageGroupsFilterExpr)
|
||||
|
||||
// Resolve tables. Same convention as hosts (distributed names from helpers).
|
||||
adjustedStart, adjustedEnd, _, localTimeSeriesTable := telemetrymetrics.WhichTSTableToUse(
|
||||
uint64(start), uint64(end), nil,
|
||||
)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(
|
||||
uint64(start), uint64(end),
|
||||
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
|
||||
)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(samplesTable)
|
||||
// Step-floor bounds + resolve tables in one shot to match QB v5 querier.
|
||||
samplesStartMs, flooredEndMs, tsAdjustedStart, _, localTimeSeriesTable, distributedSamplesTable, _ := alignedMetricWindow(start, end)
|
||||
valueCol := telemetrymetrics.ValueColumnForSamplesTable(distributedSamplesTable)
|
||||
|
||||
// ----- timeSeriesFPs -----
|
||||
timeSeriesFPs := sqlbuilder.NewSelectBuilder()
|
||||
@@ -234,8 +229,8 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
timeSeriesFPs.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTimeSeriesTable))
|
||||
timeSeriesFPs.Where(
|
||||
timeSeriesFPs.E("metric_name", podPhaseMetricName),
|
||||
timeSeriesFPs.GE("unix_milli", adjustedStart),
|
||||
timeSeriesFPs.L("unix_milli", adjustedEnd),
|
||||
timeSeriesFPs.GE("unix_milli", tsAdjustedStart),
|
||||
timeSeriesFPs.LE("unix_milli", flooredEndMs),
|
||||
)
|
||||
if mergedFilterExpr != "" {
|
||||
filterClause, err := m.buildFilterClause(ctx, &qbtypes.Filter{Expression: mergedFilterExpr}, start, end)
|
||||
@@ -267,12 +262,12 @@ func (m *module) getPerGroupPodPhaseCounts(
|
||||
latestPhasePerPod.Select(latestPhasePerPodSelectCols...)
|
||||
latestPhasePerPod.From(fmt.Sprintf(
|
||||
"%s.%s AS samples INNER JOIN time_series_fps AS tsfp ON samples.fingerprint = tsfp.fingerprint",
|
||||
telemetrymetrics.DBName, samplesTable,
|
||||
telemetrymetrics.DBName, distributedSamplesTable,
|
||||
))
|
||||
latestPhasePerPod.Where(
|
||||
latestPhasePerPod.E("samples.metric_name", podPhaseMetricName),
|
||||
latestPhasePerPod.GE("samples.unix_milli", start),
|
||||
latestPhasePerPod.L("samples.unix_milli", end),
|
||||
latestPhasePerPod.GE("samples.unix_milli", samplesStartMs),
|
||||
latestPhasePerPod.L("samples.unix_milli", flooredEndMs),
|
||||
"tsfp.pod_uid != ''",
|
||||
)
|
||||
latestPhasePerPod.GroupBy(latestPhasePerPodGroupBy...)
|
||||
|
||||
Reference in New Issue
Block a user