mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-18 16:00:32 +01:00
Compare commits
14 Commits
ns/flamegr
...
nv/4325
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3002863f40 | ||
|
|
0f403c82af | ||
|
|
bd9dccf97d | ||
|
|
af9d6734d9 | ||
|
|
dfccfa269f | ||
|
|
2772ab94d3 | ||
|
|
5eb2ab2cb8 | ||
|
|
0de84488ff | ||
|
|
4e05c86286 | ||
|
|
e0756e38eb | ||
|
|
4e6de5c826 | ||
|
|
a687c61919 | ||
|
|
4066425952 | ||
|
|
2381cf1da0 |
@@ -11,6 +11,7 @@ function makeSpan(
|
||||
): FlamegraphSpan {
|
||||
return {
|
||||
parentSpanId: '',
|
||||
traceId: 'trace-1',
|
||||
hasError: false,
|
||||
serviceName: 'svc',
|
||||
name: 'op',
|
||||
|
||||
@@ -6,6 +6,7 @@ export const MOCK_SPAN: FlamegraphSpan = {
|
||||
durationNano: 50_000_000, // 50ms
|
||||
spanId: 'span-1',
|
||||
parentSpanId: '',
|
||||
traceId: 'trace-1',
|
||||
hasError: false,
|
||||
serviceName: 'test-service',
|
||||
name: 'test-span',
|
||||
|
||||
@@ -23,6 +23,7 @@ export interface FlamegraphSpan {
|
||||
durationNano: number;
|
||||
spanId: string;
|
||||
parentSpanId: string;
|
||||
traceId: string;
|
||||
hasError: boolean;
|
||||
serviceName: string;
|
||||
name: string;
|
||||
|
||||
@@ -426,9 +426,17 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
}
|
||||
nonExistentMetrics := []string{}
|
||||
var dormantMetricsWarningMsg string
|
||||
if len(missingMetrics) > 0 {
|
||||
lastSeenInfo, _ := q.metadataStore.FetchLastSeenInfoMulti(ctx, missingMetrics...)
|
||||
for _, missingMetricName := range missingMetrics {
|
||||
// internal metrics aren't user-controlled — skip errors/warnings for them since users can't act on them
|
||||
isInternalMetric := func(n string) bool { return strings.HasPrefix(n, "signoz.") || strings.HasPrefix(n, "signoz_") }
|
||||
externalMissingMetrics := make([]string, 0, len(missingMetrics))
|
||||
for _, m := range missingMetrics {
|
||||
if !isInternalMetric(m) {
|
||||
externalMissingMetrics = append(externalMissingMetrics, m)
|
||||
}
|
||||
}
|
||||
if len(externalMissingMetrics) > 0 {
|
||||
lastSeenInfo, _ := q.metadataStore.FetchLastSeenInfoMulti(ctx, externalMissingMetrics...)
|
||||
for _, missingMetricName := range externalMissingMetrics {
|
||||
if ts, ok := lastSeenInfo[missingMetricName]; ok && ts > 0 {
|
||||
continue
|
||||
}
|
||||
@@ -440,24 +448,22 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
return nil, errors.NewNotFoundf(errors.CodeNotFound, "the following metrics were not found: %s", strings.Join(nonExistentMetrics, ", "))
|
||||
}
|
||||
lastSeenStr := func(name string) string {
|
||||
if ts, ok := lastSeenInfo[name]; ok && ts > 0 {
|
||||
ago := humanize.RelTime(time.UnixMilli(ts), time.Now(), "ago", "from now")
|
||||
return fmt.Sprintf("%s (last seen %s)", name, ago)
|
||||
}
|
||||
return name // this case won't come cuz lastSeenStr is never called for metrics in nonExistentMetrics
|
||||
ts := lastSeenInfo[name]
|
||||
ago := humanize.RelTime(time.UnixMilli(ts), time.Now(), "ago", "from now")
|
||||
return fmt.Sprintf("%s (last seen %s)", name, ago)
|
||||
}
|
||||
if len(missingMetrics) == 1 {
|
||||
dormantMetricsWarningMsg = fmt.Sprintf("no data found for the metric %s in the query time range", lastSeenStr(missingMetrics[0]))
|
||||
if len(externalMissingMetrics) == 1 {
|
||||
dormantMetricsWarningMsg = fmt.Sprintf("no data found for the metric %s in the query time range", lastSeenStr(externalMissingMetrics[0]))
|
||||
} else {
|
||||
parts := make([]string, len(missingMetrics))
|
||||
for i, m := range missingMetrics {
|
||||
parts := make([]string, len(externalMissingMetrics))
|
||||
for i, m := range externalMissingMetrics {
|
||||
parts[i] = lastSeenStr(m)
|
||||
}
|
||||
dormantMetricsWarningMsg = fmt.Sprintf("no data found for the following metrics in the query time range: %s", strings.Join(parts, ", "))
|
||||
}
|
||||
}
|
||||
preseededResults := make(map[string]any)
|
||||
for _, name := range missingMetricQueries { // at this point missing metrics will not have any non existent metrics, only normal ones
|
||||
for _, name := range missingMetricQueries {
|
||||
switch req.RequestType {
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
preseededResults[name] = &qbtypes.TimeSeriesData{QueryName: name}
|
||||
|
||||
@@ -1140,8 +1140,6 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
// map[traceID][level]span
|
||||
var selectedSpans = [][]*model.FlamegraphSpan{}
|
||||
var traceRoots []*model.FlamegraphSpan
|
||||
// time bounds for Pass 1 and Pass 2 (set on cache miss, zero on cache hit)
|
||||
var tsBucketStart, tsBucketEnd int64
|
||||
|
||||
// get the trace tree from cache!
|
||||
cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, orgID, traceID)
|
||||
@@ -1157,59 +1155,62 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
if err != nil {
|
||||
r.logger.Info("cache miss for getFlamegraphSpansForTrace", "traceID", traceID)
|
||||
|
||||
// Inline summary query to get time bounds shared by Pass 1 and Pass 2.
|
||||
var traceSummary model.TraceSummary
|
||||
summaryQuery := fmt.Sprintf(
|
||||
"SELECT trace_id, min(start) AS start, max(end) AS end, sum(num_spans) AS num_spans FROM %s.%s WHERE trace_id=$1 GROUP BY trace_id",
|
||||
r.TraceDB, r.traceSummaryTable)
|
||||
if summaryErr := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(
|
||||
&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans,
|
||||
); summaryErr != nil {
|
||||
if summaryErr == sql.ErrNoRows {
|
||||
return trace, nil
|
||||
}
|
||||
r.logger.Error("Error in processing flamegraph trace summary sql query", errorsV2.Attr(summaryErr))
|
||||
return nil, model.ExecutionError(fmt.Errorf("getFlamegraphSpansForTrace: error querying trace summary: %w", summaryErr))
|
||||
selectCols := "timestamp, duration_nano, span_id, trace_id, has_error, links as references, resource_string_service$$name, name, events"
|
||||
if len(req.SelectFields) > 0 {
|
||||
selectCols += ", attributes_string, attributes_number, attributes_bool, resources_string"
|
||||
}
|
||||
tsBucketStart = traceSummary.Start.Unix() - 1800
|
||||
tsBucketEnd = traceSummary.End.Unix()
|
||||
flamegraphQuery := fmt.Sprintf("SELECT %s FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", selectCols, r.TraceDB, r.traceTableName)
|
||||
|
||||
// Pass 1: skeleton query — no events, no attribute maps.
|
||||
// Keeps tree-building memory lean; events are fetched in Pass 2 only for
|
||||
// the windowed spans that are actually returned in the response.
|
||||
skeletonQuery := fmt.Sprintf(
|
||||
"SELECT DISTINCT ON (span_id) timestamp, duration_nano, span_id, parent_span_id, has_error, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 AND ts_bucket_start>=$2 AND ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC",
|
||||
r.TraceDB, r.traceTableName)
|
||||
|
||||
var skeletonSpans []model.SpanItemV2
|
||||
if skeletonErr := r.db.Select(ctx, &skeletonSpans, skeletonQuery, traceID,
|
||||
strconv.FormatInt(tsBucketStart, 10), strconv.FormatInt(tsBucketEnd, 10),
|
||||
); skeletonErr != nil {
|
||||
r.logger.Error("Error in processing flamegraph skeleton sql query", errorsV2.Attr(skeletonErr))
|
||||
return nil, model.ExecutionError(fmt.Errorf("getFlamegraphSpansForTrace: error querying skeleton spans: %w", skeletonErr))
|
||||
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, flamegraphQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(skeletonSpans) == 0 {
|
||||
if len(searchScanResponses) == 0 {
|
||||
return trace, nil
|
||||
}
|
||||
|
||||
for _, item := range skeletonSpans {
|
||||
for _, item := range searchScanResponses {
|
||||
ref := []model.OtelSpanRef{}
|
||||
err := json.Unmarshal([]byte(item.References), &ref)
|
||||
if err != nil {
|
||||
r.logger.Error("Error unmarshalling references", errorsV2.Attr(err))
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling references %s", err.Error())
|
||||
}
|
||||
|
||||
events := make([]model.Event, 0)
|
||||
for _, event := range item.Events {
|
||||
var eventMap model.Event
|
||||
err = json.Unmarshal([]byte(event), &eventMap)
|
||||
if err != nil {
|
||||
r.logger.Error("Error unmarshalling events", errorsV2.Attr(err))
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling events %s", err.Error())
|
||||
}
|
||||
events = append(events, eventMap)
|
||||
}
|
||||
|
||||
jsonItem := model.FlamegraphSpan{
|
||||
SpanID: item.SpanID,
|
||||
TraceID: item.TraceID,
|
||||
ServiceName: item.ServiceName,
|
||||
Name: item.Name,
|
||||
DurationNano: item.DurationNano,
|
||||
HasError: item.HasError,
|
||||
ParentSpanID: item.ParentSpanId,
|
||||
References: ref,
|
||||
Events: events,
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
}
|
||||
|
||||
if len(req.SelectFields) > 0 {
|
||||
jsonItem.SetRequestedFields(item, req.SelectFields)
|
||||
}
|
||||
|
||||
// metadata calculation
|
||||
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
|
||||
if startTime == 0 || startTimeUnixNano < startTime {
|
||||
startTime = startTimeUnixNano
|
||||
}
|
||||
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
|
||||
endTime = startTimeUnixNano + jsonItem.DurationNano
|
||||
endTime = (startTimeUnixNano + jsonItem.DurationNano)
|
||||
}
|
||||
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
||||
durationNano = jsonItem.DurationNano
|
||||
@@ -1218,34 +1219,41 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
|
||||
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
|
||||
}
|
||||
skeletonSpans = nil
|
||||
|
||||
// build parent-child tree using parent_span_id; insert placeholders for missing parents
|
||||
// traverse through the map and append each node to the children array of the parent node
|
||||
// and add missing spans
|
||||
for _, spanNode := range spanIdToSpanNodeMap {
|
||||
if spanNode.ParentSpanID == "" {
|
||||
traceRoots = append(traceRoots, spanNode)
|
||||
} else if parentNode, exists := spanIdToSpanNodeMap[spanNode.ParentSpanID]; exists {
|
||||
parentNode.Children = append(parentNode.Children, spanNode)
|
||||
} else {
|
||||
if _, alreadyCreated := spanIdToSpanNodeMap[spanNode.ParentSpanID]; !alreadyCreated {
|
||||
missingSpan := &model.FlamegraphSpan{
|
||||
SpanID: spanNode.ParentSpanID,
|
||||
Name: "Missing Span",
|
||||
TimeUnixNano: spanNode.TimeUnixNano,
|
||||
DurationNano: spanNode.DurationNano,
|
||||
Events: make([]model.Event, 0),
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
hasParentSpanNode := false
|
||||
for _, reference := range spanNode.References {
|
||||
if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
|
||||
hasParentSpanNode = true
|
||||
if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
|
||||
parentNode.Children = append(parentNode.Children, spanNode)
|
||||
} else {
|
||||
// insert the missing spans
|
||||
missingSpan := model.FlamegraphSpan{
|
||||
SpanID: reference.SpanId,
|
||||
TraceID: spanNode.TraceID,
|
||||
ServiceName: "",
|
||||
Name: "Missing Span",
|
||||
TimeUnixNano: spanNode.TimeUnixNano,
|
||||
DurationNano: spanNode.DurationNano,
|
||||
HasError: false,
|
||||
Events: make([]model.Event, 0),
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
}
|
||||
missingSpan.Children = append(missingSpan.Children, spanNode)
|
||||
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
|
||||
traceRoots = append(traceRoots, &missingSpan)
|
||||
}
|
||||
spanIdToSpanNodeMap[missingSpan.SpanID] = missingSpan
|
||||
traceRoots = append(traceRoots, missingSpan)
|
||||
}
|
||||
spanIdToSpanNodeMap[spanNode.ParentSpanID].Children = append(
|
||||
spanIdToSpanNodeMap[spanNode.ParentSpanID].Children, spanNode)
|
||||
}
|
||||
if !hasParentSpanNode && !tracedetail.ContainsFlamegraphSpan(traceRoots, spanNode) {
|
||||
traceRoots = append(traceRoots, spanNode)
|
||||
}
|
||||
}
|
||||
|
||||
selectedSpans = tracedetail.GetAllSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap)
|
||||
spanIdToSpanNodeMap = nil
|
||||
|
||||
// TODO: set the trace data (model.GetFlamegraphSpansForTraceCache) in cache here
|
||||
// removed existing cache usage since it was not getting used due to this bug https://github.com/SigNoz/engineering-pod/issues/4648
|
||||
@@ -1268,74 +1276,6 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
}
|
||||
r.logger.Debug("getFlamegraphSpansForTrace: processing post cache", "duration", time.Since(processingPostCache), "traceID", traceID, "totalSpans", totalSpanCount, "limit", clientLimit)
|
||||
|
||||
// Pass 2: hydrate events and requested attribute fields only for the selected window spans.
|
||||
// tsBucketStart is non-zero only when we performed a DB fetch (cache miss path).
|
||||
if err != nil && tsBucketStart != 0 {
|
||||
needsAttrMaps := false
|
||||
needsResourceMap := false
|
||||
for _, f := range req.SelectFields {
|
||||
if f.FieldContext == telemetrytypes.FieldContextAttribute {
|
||||
needsAttrMaps = true
|
||||
}
|
||||
if f.FieldContext == telemetrytypes.FieldContextResource {
|
||||
needsResourceMap = true
|
||||
}
|
||||
}
|
||||
|
||||
selectedSpanIDs := make([]string, 0)
|
||||
selectedSpanMap := make(map[string]*model.FlamegraphSpan)
|
||||
for _, level := range selectedSpansForRequest {
|
||||
for _, span := range level {
|
||||
selectedSpanIDs = append(selectedSpanIDs, span.SpanID)
|
||||
selectedSpanMap[span.SpanID] = span
|
||||
}
|
||||
}
|
||||
|
||||
if len(selectedSpanIDs) > 0 {
|
||||
hydrateCols := "span_id, events"
|
||||
if needsAttrMaps {
|
||||
hydrateCols += ", attributes_string, attributes_number, attributes_bool"
|
||||
}
|
||||
if needsResourceMap {
|
||||
hydrateCols += ", resources_string"
|
||||
}
|
||||
hydrateQuery := fmt.Sprintf(
|
||||
"SELECT %s FROM %s.%s WHERE trace_id=@traceID AND ts_bucket_start>=@tsStart AND ts_bucket_start<=@tsEnd AND span_id IN @spanIDs",
|
||||
hydrateCols, r.TraceDB, r.traceTableName)
|
||||
|
||||
var hydrateRows []model.SpanItemV2
|
||||
if hydrateErr := r.db.Select(ctx, &hydrateRows, hydrateQuery,
|
||||
clickhouse.Named("traceID", traceID),
|
||||
clickhouse.Named("tsStart", tsBucketStart),
|
||||
clickhouse.Named("tsEnd", tsBucketEnd),
|
||||
clickhouse.Named("spanIDs", selectedSpanIDs),
|
||||
); hydrateErr != nil {
|
||||
r.logger.Error("Error in processing flamegraph hydration sql query", errorsV2.Attr(hydrateErr))
|
||||
return nil, model.ExecutionError(fmt.Errorf("getFlamegraphSpansForTrace: error querying events: %w", hydrateErr))
|
||||
}
|
||||
|
||||
for _, item := range hydrateRows {
|
||||
span, ok := selectedSpanMap[item.SpanID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
events := make([]model.Event, 0, len(item.Events))
|
||||
for _, event := range item.Events {
|
||||
var eventMap model.Event
|
||||
if unmarshalErr := json.Unmarshal([]byte(event), &eventMap); unmarshalErr != nil {
|
||||
r.logger.Error("Error unmarshalling events", errorsV2.Attr(unmarshalErr))
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling events %s", unmarshalErr.Error())
|
||||
}
|
||||
events = append(events, eventMap)
|
||||
}
|
||||
span.Events = events
|
||||
if len(req.SelectFields) > 0 {
|
||||
span.SetRequestedFields(item, req.SelectFields)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace.Spans = selectedSpansForRequest
|
||||
trace.StartTimestampMillis = startTime / 1000000
|
||||
trace.EndTimestampMillis = endTime / 1000000
|
||||
|
||||
@@ -297,13 +297,14 @@ type FlamegraphSpan struct {
|
||||
TimeUnixNano uint64 `json:"timestamp"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
SpanID string `json:"spanId"`
|
||||
TraceID string `json:"traceId"`
|
||||
HasError bool `json:"hasError"`
|
||||
ServiceName string `json:"serviceName"`
|
||||
Name string `json:"name"`
|
||||
Level int64 `json:"level"`
|
||||
ParentSpanID string `json:"parentSpanId"`
|
||||
Events []Event `json:"event"`
|
||||
Children []*FlamegraphSpan `json:"-"`
|
||||
References []OtelSpanRef `json:"references,omitempty"`
|
||||
Children []*FlamegraphSpan `json:"children"`
|
||||
Attributes map[string]any `json:"attributes,omitempty"`
|
||||
Resource map[string]string `json:"resource,omitempty"`
|
||||
}
|
||||
|
||||
@@ -640,6 +640,32 @@ def test_non_existent_metrics_returns_404(
|
||||
assert get_error_message(response.json()) == "could not find the metric whatevergoennnsgoeshere"
|
||||
|
||||
|
||||
def test_non_existent_internal_metrics_returns_no_warning(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
metric_name = "signoz_calls_total"
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"doesnotreallymatter",
|
||||
"sum",
|
||||
)
|
||||
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
|
||||
start_2h = int((now - timedelta(hours=2)).timestamp() * 1000)
|
||||
response = make_query_request(signoz, token, start_2h, end_ms, [query])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
data = response.json()
|
||||
assert get_all_warnings(data) == []
|
||||
|
||||
|
||||
# Verify /api/v1/fields/values filters label values by metricNamespace prefix.
|
||||
# Inserts metrics under ns.a and ns.b, then asserts a specific prefix returns
|
||||
# only matching values while a common prefix returns both.
|
||||
|
||||
Reference in New Issue
Block a user