mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-31 01:20:25 +01:00
Compare commits
4 Commits
ns/flamegr
...
nv/10722
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ad429d3e9 | ||
|
|
a789d01acb | ||
|
|
b9b20bb181 | ||
|
|
e16c277d95 |
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
gomaps "maps"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -282,6 +283,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
queries := make(map[string]qbtypes.Query)
|
||||
steps := make(map[string]qbtypes.Step)
|
||||
missingMetrics := []string{}
|
||||
missingMetricQueries := []string{}
|
||||
|
||||
for _, query := range req.CompositeQuery.Queries {
|
||||
var queryName string
|
||||
@@ -374,6 +376,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
}
|
||||
q.logger.DebugContext(ctx, "fetched metric temporalities and types", slog.Any("metric_temporality", metricTemporality), slog.Any("metric_types", metricTypes))
|
||||
}
|
||||
presentAggregations := []qbtypes.MetricAggregation{}
|
||||
for i := range spec.Aggregations {
|
||||
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Temporality == metrictypes.Unknown {
|
||||
if temp, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && temp != metrictypes.Unknown {
|
||||
@@ -384,13 +387,19 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
missingMetrics = append(missingMetrics, spec.Aggregations[i].MetricName)
|
||||
continue
|
||||
}
|
||||
|
||||
presentAggregations = append(presentAggregations, spec.Aggregations[i])
|
||||
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
|
||||
if foundMetricType, ok := metricTypes[spec.Aggregations[i].MetricName]; ok && foundMetricType != metrictypes.UnspecifiedType {
|
||||
spec.Aggregations[i].Type = foundMetricType
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(presentAggregations) == 0 {
|
||||
missingMetricQueries = append(missingMetricQueries, spec.Name)
|
||||
steps[spec.Name] = spec.StepInterval
|
||||
continue
|
||||
}
|
||||
spec.Aggregations = presentAggregations
|
||||
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
||||
var bq *builderQuery[qbtypes.MetricAggregation]
|
||||
@@ -409,25 +418,50 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
}
|
||||
}
|
||||
}
|
||||
nonExistentMetrics := []string{}
|
||||
var dormantMetricsWarningMsg string
|
||||
if len(missingMetrics) > 0 {
|
||||
lastSeenInfo, _ := q.metadataStore.FetchLastSeenInfoMulti(ctx, missingMetrics...)
|
||||
for _, missingMetricName := range missingMetrics {
|
||||
if ts, ok := lastSeenInfo[missingMetricName]; ok && ts > 0 {
|
||||
continue
|
||||
}
|
||||
nonExistentMetrics = append(nonExistentMetrics, missingMetricName)
|
||||
}
|
||||
if len(nonExistentMetrics) == 1 {
|
||||
return nil, errors.NewNotFoundf(errors.CodeNotFound, "could not find the metric %s", nonExistentMetrics[0])
|
||||
} else if len(nonExistentMetrics) > 1 {
|
||||
return nil, errors.NewNotFoundf(errors.CodeNotFound, "the following metrics were not found: %s", strings.Join(nonExistentMetrics, ", "))
|
||||
}
|
||||
lastSeenStr := func(name string) string {
|
||||
if ts, ok := lastSeenInfo[name]; ok && ts > 0 {
|
||||
ago := humanize.RelTime(time.UnixMilli(ts), time.Now(), "ago", "from now")
|
||||
return fmt.Sprintf("%s (last seen %s)", name, ago)
|
||||
}
|
||||
return name
|
||||
return name // this case won't come cuz lastSeenStr is never called for metrics in nonExistentMetrics
|
||||
}
|
||||
if len(missingMetrics) == 1 {
|
||||
return nil, errors.NewNotFoundf(errors.CodeNotFound, "no data found for the metric %s in the query time range", lastSeenStr(missingMetrics[0]))
|
||||
dormantMetricsWarningMsg = fmt.Sprintf("no data found for the metric %s in the query time range", lastSeenStr(missingMetrics[0]))
|
||||
} else {
|
||||
parts := make([]string, len(missingMetrics))
|
||||
for i, m := range missingMetrics {
|
||||
parts[i] = lastSeenStr(m)
|
||||
}
|
||||
dormantMetricsWarningMsg = fmt.Sprintf("no data found for the following metrics in the query time range: %s", strings.Join(parts, ", "))
|
||||
}
|
||||
parts := make([]string, len(missingMetrics))
|
||||
for i, m := range missingMetrics {
|
||||
parts[i] = lastSeenStr(m)
|
||||
}
|
||||
return nil, errors.NewNotFoundf(errors.CodeNotFound, "no data found for the following metrics in the query time range: %s", strings.Join(parts, ", "))
|
||||
}
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event)
|
||||
preseededResults := make(map[string]any)
|
||||
for _, name := range missingMetricQueries { // at this point missing metrics will not have any non existent metrics, only normal ones
|
||||
switch req.RequestType {
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
preseededResults[name] = &qbtypes.TimeSeriesData{QueryName: name}
|
||||
case qbtypes.RequestTypeScalar:
|
||||
preseededResults[name] = &qbtypes.ScalarData{QueryName: name}
|
||||
case qbtypes.RequestTypeRaw:
|
||||
preseededResults[name] = &qbtypes.RawData{QueryName: name}
|
||||
}
|
||||
}
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event, preseededResults)
|
||||
if qbResp != nil {
|
||||
qbResp.QBEvent = event
|
||||
if len(intervalWarnings) != 0 && req.RequestType == qbtypes.RequestTypeTimeSeries {
|
||||
@@ -440,6 +474,14 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
}
|
||||
}
|
||||
}
|
||||
if dormantMetricsWarningMsg != "" {
|
||||
if qbResp.Warning == nil {
|
||||
qbResp.Warning = &qbtypes.QueryWarnData{}
|
||||
}
|
||||
qbResp.Warning.Warnings = append(qbResp.Warning.Warnings, qbtypes.QueryWarnDataAdditional{
|
||||
Message: dormantMetricsWarningMsg,
|
||||
})
|
||||
}
|
||||
}
|
||||
return qbResp, qbErr
|
||||
}
|
||||
@@ -516,7 +558,7 @@ func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qb
|
||||
})
|
||||
queries[spec.Name] = bq
|
||||
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, nil, event)
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, nil, event, nil)
|
||||
if qbErr != nil {
|
||||
client.Error <- qbErr
|
||||
return
|
||||
@@ -545,6 +587,7 @@ func (q *querier) run(
|
||||
req *qbtypes.QueryRangeRequest,
|
||||
steps map[string]qbtypes.Step,
|
||||
qbEvent *qbtypes.QBEvent,
|
||||
preseededResults map[string]any,
|
||||
) (*qbtypes.QueryRangeResponse, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.PanelType: qbEvent.PanelType,
|
||||
@@ -630,6 +673,7 @@ func (q *querier) run(
|
||||
}
|
||||
}
|
||||
|
||||
gomaps.Copy(results, preseededResults)
|
||||
processedResults, err := q.postProcessResults(ctx, results, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
@@ -1259,7 +1258,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
}
|
||||
}
|
||||
|
||||
selectedSpans = tracedetail.GetAllSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap)
|
||||
selectedSpans = tracedetail.GetSelectedSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap)
|
||||
traceCache := model.GetFlamegraphSpansForTraceCache{
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
@@ -1276,22 +1275,12 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
}
|
||||
|
||||
processingPostCache := time.Now()
|
||||
selectedSpansForRequest := selectedSpans
|
||||
clientLimit := min(req.Limit, tracedetail.MaxLimitWithoutSampling)
|
||||
totalSpanCount := tracedetail.GetTotalSpanCount(selectedSpans)
|
||||
if totalSpanCount > uint64(clientLimit) {
|
||||
// using trace start and end time if boundary ts are set to zero (or not set)
|
||||
boundaryStart := max(timestamp.MilliToNano(req.BoundaryStartTS), startTime)
|
||||
boundaryEnd := max(timestamp.MilliToNano(req.BoundaryEndTS), endTime)
|
||||
|
||||
selectedSpansForRequest = tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans, boundaryStart, boundaryEnd)
|
||||
}
|
||||
r.logger.Info("getFlamegraphSpansForTrace: processing post cache", "duration", time.Since(processingPostCache), "traceID", traceID, "totalSpans", totalSpanCount, "limit", clientLimit)
|
||||
selectedSpansForRequest := tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans, startTime, endTime)
|
||||
r.logger.Info("getFlamegraphSpansForTrace: processing post cache", "duration", time.Since(processingPostCache), "traceID", traceID)
|
||||
|
||||
trace.Spans = selectedSpansForRequest
|
||||
trace.StartTimestampMillis = startTime / 1000000
|
||||
trace.EndTimestampMillis = endTime / 1000000
|
||||
trace.HasMore = totalSpanCount > uint64(clientLimit)
|
||||
return trace, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -7,12 +7,9 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
flamegraphSpanLevelLimit float64 = 50
|
||||
flamegraphSpanLimitPerLevel int = 100
|
||||
flamegraphSamplingBucketCount int = 50
|
||||
flamegraphTopLatencySpanCount int = 5
|
||||
|
||||
MaxLimitWithoutSampling uint = 120_000
|
||||
SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH float64 = 50
|
||||
SPAN_LIMIT_PER_LEVEL int = 100
|
||||
TIMESTAMP_SAMPLING_BUCKET_COUNT int = 50
|
||||
)
|
||||
|
||||
func ContainsFlamegraphSpan(slice []*model.FlamegraphSpan, item *model.FlamegraphSpan) bool {
|
||||
@@ -55,8 +52,7 @@ func FindIndexForSelectedSpan(spans [][]*model.FlamegraphSpan, selectedSpanId st
|
||||
return selectedSpanLevel
|
||||
}
|
||||
|
||||
// GetAllSpansForFlamegraph groups all spans as per their level
|
||||
func GetAllSpansForFlamegraph(traceRoots []*model.FlamegraphSpan, spanIdToSpanNodeMap map[string]*model.FlamegraphSpan) [][]*model.FlamegraphSpan {
|
||||
func GetSelectedSpansForFlamegraph(traceRoots []*model.FlamegraphSpan, spanIdToSpanNodeMap map[string]*model.FlamegraphSpan) [][]*model.FlamegraphSpan {
|
||||
|
||||
var traceIdLevelledFlamegraph = map[string]map[int64][]*model.FlamegraphSpan{}
|
||||
selectedSpans := [][]*model.FlamegraphSpan{}
|
||||
@@ -104,7 +100,7 @@ func getLatencyAndTimestampBucketedSpans(spans []*model.FlamegraphSpan, selected
|
||||
})
|
||||
|
||||
// pick the top 5 latency spans
|
||||
for idx := range flamegraphTopLatencySpanCount {
|
||||
for idx := range 5 {
|
||||
sampledSpans = append(sampledSpans, spans[idx])
|
||||
}
|
||||
|
||||
@@ -121,17 +117,17 @@ func getLatencyAndTimestampBucketedSpans(spans []*model.FlamegraphSpan, selected
|
||||
}
|
||||
}
|
||||
|
||||
bucketSize := (endTime - startTime) / uint64(flamegraphSamplingBucketCount)
|
||||
bucketSize := (endTime - startTime) / uint64(TIMESTAMP_SAMPLING_BUCKET_COUNT)
|
||||
if bucketSize == 0 {
|
||||
bucketSize = 1
|
||||
}
|
||||
|
||||
bucketedSpans := make([][]*model.FlamegraphSpan, flamegraphSamplingBucketCount)
|
||||
bucketedSpans := make([][]*model.FlamegraphSpan, 50)
|
||||
|
||||
for _, span := range spans {
|
||||
if span.TimeUnixNano >= startTime && span.TimeUnixNano <= endTime {
|
||||
bucketIndex := int((span.TimeUnixNano - startTime) / bucketSize)
|
||||
if bucketIndex >= 0 && bucketIndex < flamegraphSamplingBucketCount {
|
||||
if bucketIndex >= 0 && bucketIndex < 50 {
|
||||
bucketedSpans[bucketIndex] = append(bucketedSpans[bucketIndex], span)
|
||||
}
|
||||
}
|
||||
@@ -160,8 +156,8 @@ func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpan
|
||||
selectedIndex = FindIndexForSelectedSpan(selectedSpans, selectedSpanID)
|
||||
}
|
||||
|
||||
lowerLimit := selectedIndex - int(flamegraphSpanLevelLimit*0.4)
|
||||
upperLimit := selectedIndex + int(flamegraphSpanLevelLimit*0.6)
|
||||
lowerLimit := selectedIndex - int(SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH*0.4)
|
||||
upperLimit := selectedIndex + int(SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH*0.6)
|
||||
|
||||
if lowerLimit < 0 {
|
||||
upperLimit = upperLimit - lowerLimit
|
||||
@@ -178,7 +174,7 @@ func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpan
|
||||
}
|
||||
|
||||
for i := lowerLimit; i < upperLimit; i++ {
|
||||
if len(selectedSpans[i]) > flamegraphSpanLimitPerLevel {
|
||||
if len(selectedSpans[i]) > SPAN_LIMIT_PER_LEVEL {
|
||||
_spans := getLatencyAndTimestampBucketedSpans(selectedSpans[i], selectedSpanID, i == selectedIndex, startTime, endTime)
|
||||
selectedSpansForRequest = append(selectedSpansForRequest, _spans)
|
||||
} else {
|
||||
@@ -188,12 +184,3 @@ func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpan
|
||||
|
||||
return selectedSpansForRequest
|
||||
}
|
||||
|
||||
func GetTotalSpanCount(spans [][]*model.FlamegraphSpan) uint64 {
|
||||
levelCount := len(spans)
|
||||
spanCount := uint64(0)
|
||||
for i := range levelCount {
|
||||
spanCount += uint64(len(spans[i]))
|
||||
}
|
||||
return spanCount
|
||||
}
|
||||
|
||||
@@ -337,10 +337,7 @@ type GetWaterfallSpansForTraceWithMetadataParams struct {
|
||||
}
|
||||
|
||||
type GetFlamegraphSpansForTraceParams struct {
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
Limit uint `json:"limit"`
|
||||
BoundaryStartTS uint64 `json:"boundaryStartTsMilli"`
|
||||
BoundaryEndTS uint64 `json:"boundarEndTsMilli"`
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
}
|
||||
|
||||
type SpanFilterParams struct {
|
||||
|
||||
@@ -337,7 +337,6 @@ type GetFlamegraphSpansForTraceResponse struct {
|
||||
EndTimestampMillis uint64 `json:"endTimestampMillis"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
Spans [][]*FlamegraphSpan `json:"spans"`
|
||||
HasMore bool `json:"hasMore"`
|
||||
}
|
||||
|
||||
type OtelSpanRef struct {
|
||||
|
||||
@@ -11,7 +11,3 @@ func FromTime(t time.Time) int64 {
|
||||
func Time(ts int64) time.Time {
|
||||
return time.Unix(ts/1000, (ts%1000)*int64(time.Millisecond))
|
||||
}
|
||||
|
||||
func MilliToNano(milliTS uint64) uint64 {
|
||||
return milliTS * 1000_000
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user