|
|
|
|
@@ -160,11 +160,9 @@ type ClickHouseReader struct {
|
|
|
|
|
traceResourceTableV3 string
|
|
|
|
|
traceSummaryTable string
|
|
|
|
|
|
|
|
|
|
fluxIntervalForTraceDetail time.Duration
|
|
|
|
|
cache cache.Cache
|
|
|
|
|
cacheForTraceDetail cache.Cache
|
|
|
|
|
metadataDB string
|
|
|
|
|
metadataTable string
|
|
|
|
|
cache cache.Cache
|
|
|
|
|
metadataDB string
|
|
|
|
|
metadataTable string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewTraceReader returns a TraceReader for the database
|
|
|
|
|
@@ -174,8 +172,6 @@ func NewReader(
|
|
|
|
|
telemetryStore telemetrystore.TelemetryStore,
|
|
|
|
|
prometheus prometheus.Prometheus,
|
|
|
|
|
cluster string,
|
|
|
|
|
fluxIntervalForTraceDetail time.Duration,
|
|
|
|
|
cacheForTraceDetail cache.Cache,
|
|
|
|
|
cache cache.Cache,
|
|
|
|
|
options *Options,
|
|
|
|
|
) *ClickHouseReader {
|
|
|
|
|
@@ -189,45 +185,43 @@ func NewReader(
|
|
|
|
|
traceLocalTableName := options.primary.TraceLocalTableNameV3
|
|
|
|
|
|
|
|
|
|
return &ClickHouseReader{
|
|
|
|
|
db: telemetryStore.ClickhouseDB(),
|
|
|
|
|
logger: logger,
|
|
|
|
|
prometheus: prometheus,
|
|
|
|
|
sqlDB: sqlDB,
|
|
|
|
|
TraceDB: options.primary.TraceDB,
|
|
|
|
|
operationsTable: options.primary.OperationsTable,
|
|
|
|
|
indexTable: options.primary.IndexTable,
|
|
|
|
|
errorTable: options.primary.ErrorTable,
|
|
|
|
|
usageExplorerTable: options.primary.UsageExplorerTable,
|
|
|
|
|
durationTable: options.primary.DurationTable,
|
|
|
|
|
SpansTable: options.primary.SpansTable,
|
|
|
|
|
spanAttributeTableV2: options.primary.SpanAttributeTableV2,
|
|
|
|
|
spanAttributesKeysTable: options.primary.SpanAttributeKeysTable,
|
|
|
|
|
dependencyGraphTable: options.primary.DependencyGraphTable,
|
|
|
|
|
topLevelOperationsTable: options.primary.TopLevelOperationsTable,
|
|
|
|
|
logsDB: options.primary.LogsDB,
|
|
|
|
|
logsTable: options.primary.LogsTable,
|
|
|
|
|
logsLocalTable: options.primary.LogsLocalTable,
|
|
|
|
|
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
|
|
|
|
|
logsResourceKeys: options.primary.LogsResourceKeysTable,
|
|
|
|
|
logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2,
|
|
|
|
|
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
|
|
|
|
|
cluster: cluster,
|
|
|
|
|
queryProgressTracker: queryprogress.NewQueryProgressTracker(logger),
|
|
|
|
|
logsTableV2: options.primary.LogsTableV2,
|
|
|
|
|
logsLocalTableV2: options.primary.LogsLocalTableV2,
|
|
|
|
|
logsResourceTableV2: options.primary.LogsResourceTableV2,
|
|
|
|
|
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
|
|
|
|
|
logsTableName: logsTableName,
|
|
|
|
|
logsLocalTableName: logsLocalTableName,
|
|
|
|
|
traceLocalTableName: traceLocalTableName,
|
|
|
|
|
traceTableName: traceTableName,
|
|
|
|
|
traceResourceTableV3: options.primary.TraceResourceTableV3,
|
|
|
|
|
traceSummaryTable: options.primary.TraceSummaryTable,
|
|
|
|
|
fluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
|
|
|
|
|
cache: cache,
|
|
|
|
|
cacheForTraceDetail: cacheForTraceDetail,
|
|
|
|
|
metadataDB: options.primary.MetadataDB,
|
|
|
|
|
metadataTable: options.primary.MetadataTable,
|
|
|
|
|
db: telemetryStore.ClickhouseDB(),
|
|
|
|
|
logger: logger,
|
|
|
|
|
prometheus: prometheus,
|
|
|
|
|
sqlDB: sqlDB,
|
|
|
|
|
TraceDB: options.primary.TraceDB,
|
|
|
|
|
operationsTable: options.primary.OperationsTable,
|
|
|
|
|
indexTable: options.primary.IndexTable,
|
|
|
|
|
errorTable: options.primary.ErrorTable,
|
|
|
|
|
usageExplorerTable: options.primary.UsageExplorerTable,
|
|
|
|
|
durationTable: options.primary.DurationTable,
|
|
|
|
|
SpansTable: options.primary.SpansTable,
|
|
|
|
|
spanAttributeTableV2: options.primary.SpanAttributeTableV2,
|
|
|
|
|
spanAttributesKeysTable: options.primary.SpanAttributeKeysTable,
|
|
|
|
|
dependencyGraphTable: options.primary.DependencyGraphTable,
|
|
|
|
|
topLevelOperationsTable: options.primary.TopLevelOperationsTable,
|
|
|
|
|
logsDB: options.primary.LogsDB,
|
|
|
|
|
logsTable: options.primary.LogsTable,
|
|
|
|
|
logsLocalTable: options.primary.LogsLocalTable,
|
|
|
|
|
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
|
|
|
|
|
logsResourceKeys: options.primary.LogsResourceKeysTable,
|
|
|
|
|
logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2,
|
|
|
|
|
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
|
|
|
|
|
cluster: cluster,
|
|
|
|
|
queryProgressTracker: queryprogress.NewQueryProgressTracker(logger),
|
|
|
|
|
logsTableV2: options.primary.LogsTableV2,
|
|
|
|
|
logsLocalTableV2: options.primary.LogsLocalTableV2,
|
|
|
|
|
logsResourceTableV2: options.primary.LogsResourceTableV2,
|
|
|
|
|
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
|
|
|
|
|
logsTableName: logsTableName,
|
|
|
|
|
logsLocalTableName: logsLocalTableName,
|
|
|
|
|
traceLocalTableName: traceLocalTableName,
|
|
|
|
|
traceTableName: traceTableName,
|
|
|
|
|
traceResourceTableV3: options.primary.TraceResourceTableV3,
|
|
|
|
|
traceSummaryTable: options.primary.TraceSummaryTable,
|
|
|
|
|
cache: cache,
|
|
|
|
|
metadataDB: options.primary.MetadataDB,
|
|
|
|
|
metadataTable: options.primary.MetadataTable,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -897,23 +891,6 @@ func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string,
|
|
|
|
|
return searchScanResponses, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadataCache(ctx context.Context, orgID valuer.UUID, traceID string) (*model.GetWaterfallSpansForTraceWithMetadataCache, error) {
|
|
|
|
|
cachedTraceData := new(model.GetWaterfallSpansForTraceWithMetadataCache)
|
|
|
|
|
err := r.cacheForTraceDetail.Get(ctx, orgID, strings.Join([]string{"getWaterfallSpansForTraceWithMetadata", traceID}, "-"), cachedTraceData)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", errorsV2.Attr(err), "traceID", traceID)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail {
|
|
|
|
|
r.logger.Info("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache", "traceID", traceID)
|
|
|
|
|
return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache, traceID: %s", traceID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.logger.Info("cache is successfully hit, applying cache for getWaterfallSpansForTraceWithMetadata", "traceID", traceID)
|
|
|
|
|
return cachedTraceData, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, error) {
|
|
|
|
|
response := new(model.GetWaterfallSpansForTraceWithMetadataResponse)
|
|
|
|
|
var startTime, endTime, durationNano, totalErrorSpans, totalSpans uint64
|
|
|
|
|
@@ -923,172 +900,136 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
|
|
|
|
|
var serviceNameIntervalMap = map[string][]tracedetail.Interval{}
|
|
|
|
|
var hasMissingSpans bool
|
|
|
|
|
|
|
|
|
|
cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, orgID, traceID)
|
|
|
|
|
if err == nil {
|
|
|
|
|
startTime = cachedTraceData.StartTime
|
|
|
|
|
endTime = cachedTraceData.EndTime
|
|
|
|
|
durationNano = cachedTraceData.DurationNano
|
|
|
|
|
spanIdToSpanNodeMap = cachedTraceData.SpanIdToSpanNodeMap
|
|
|
|
|
serviceNameToTotalDurationMap = cachedTraceData.ServiceNameToTotalDurationMap
|
|
|
|
|
traceRoots = cachedTraceData.TraceRoots
|
|
|
|
|
totalSpans = cachedTraceData.TotalSpans
|
|
|
|
|
totalErrorSpans = cachedTraceData.TotalErrorSpans
|
|
|
|
|
hasMissingSpans = cachedTraceData.HasMissingSpans
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT DISTINCT ON (span_id) timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, links as references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Info("cache miss for getWaterfallSpansForTraceWithMetadata", "traceID", traceID)
|
|
|
|
|
|
|
|
|
|
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT DISTINCT ON (span_id) timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, links as references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if len(searchScanResponses) == 0 {
|
|
|
|
|
return response, nil
|
|
|
|
|
}
|
|
|
|
|
totalSpans = uint64(len(searchScanResponses))
|
|
|
|
|
for _, item := range searchScanResponses {
|
|
|
|
|
ref := []model.OtelSpanRef{}
|
|
|
|
|
err := json.Unmarshal([]byte(item.References), &ref)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
r.logger.Error("getWaterfallSpansForTraceWithMetadata: error unmarshalling references", errorsV2.Attr(err), "traceID", traceID)
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "getWaterfallSpansForTraceWithMetadata: error unmarshalling references %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
if len(searchScanResponses) == 0 {
|
|
|
|
|
return response, nil
|
|
|
|
|
|
|
|
|
|
// merge attributes_number and attributes_bool to attributes_string
|
|
|
|
|
for k, v := range item.Attributes_bool {
|
|
|
|
|
item.Attributes_string[k] = fmt.Sprintf("%v", v)
|
|
|
|
|
}
|
|
|
|
|
totalSpans = uint64(len(searchScanResponses))
|
|
|
|
|
processingBeforeCache := time.Now()
|
|
|
|
|
for _, item := range searchScanResponses {
|
|
|
|
|
ref := []model.OtelSpanRef{}
|
|
|
|
|
err := json.Unmarshal([]byte(item.References), &ref)
|
|
|
|
|
for k, v := range item.Attributes_number {
|
|
|
|
|
item.Attributes_string[k] = strconv.FormatFloat(v, 'f', -1, 64)
|
|
|
|
|
}
|
|
|
|
|
for k, v := range item.Resources_string {
|
|
|
|
|
item.Attributes_string[k] = v
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
events := make([]model.Event, 0)
|
|
|
|
|
for _, event := range item.Events {
|
|
|
|
|
var eventMap model.Event
|
|
|
|
|
err = json.Unmarshal([]byte(event), &eventMap)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Error("getWaterfallSpansForTraceWithMetadata: error unmarshalling references", errorsV2.Attr(err), "traceID", traceID)
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "getWaterfallSpansForTraceWithMetadata: error unmarshalling references %s", err.Error())
|
|
|
|
|
r.logger.Error("Error unmarshalling events", errorsV2.Attr(err))
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getWaterfallSpansForTraceWithMetadata: error in unmarshalling events %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// merge attributes_number and attributes_bool to attributes_string
|
|
|
|
|
for k, v := range item.Attributes_bool {
|
|
|
|
|
item.Attributes_string[k] = fmt.Sprintf("%v", v)
|
|
|
|
|
}
|
|
|
|
|
for k, v := range item.Attributes_number {
|
|
|
|
|
item.Attributes_string[k] = strconv.FormatFloat(v, 'f', -1, 64)
|
|
|
|
|
}
|
|
|
|
|
for k, v := range item.Resources_string {
|
|
|
|
|
item.Attributes_string[k] = v
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
events := make([]model.Event, 0)
|
|
|
|
|
for _, event := range item.Events {
|
|
|
|
|
var eventMap model.Event
|
|
|
|
|
err = json.Unmarshal([]byte(event), &eventMap)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Error("Error unmarshalling events", errorsV2.Attr(err))
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getWaterfallSpansForTraceWithMetadata: error in unmarshalling events %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
events = append(events, eventMap)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
|
|
|
|
|
|
|
|
|
|
jsonItem := model.Span{
|
|
|
|
|
SpanID: item.SpanID,
|
|
|
|
|
TraceID: item.TraceID,
|
|
|
|
|
ServiceName: item.ServiceName,
|
|
|
|
|
Name: item.Name,
|
|
|
|
|
Kind: int32(item.Kind),
|
|
|
|
|
DurationNano: item.DurationNano,
|
|
|
|
|
HasError: item.HasError,
|
|
|
|
|
StatusMessage: item.StatusMessage,
|
|
|
|
|
StatusCodeString: item.StatusCodeString,
|
|
|
|
|
SpanKind: item.SpanKind,
|
|
|
|
|
References: ref,
|
|
|
|
|
Events: events,
|
|
|
|
|
TagMap: item.Attributes_string,
|
|
|
|
|
Children: make([]*model.Span, 0),
|
|
|
|
|
TimeUnixNano: startTimeUnixNano, // Store nanoseconds temporarily
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// metadata calculation
|
|
|
|
|
if startTime == 0 || startTimeUnixNano < startTime {
|
|
|
|
|
startTime = startTimeUnixNano
|
|
|
|
|
}
|
|
|
|
|
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
|
|
|
|
|
endTime = (startTimeUnixNano + jsonItem.DurationNano)
|
|
|
|
|
}
|
|
|
|
|
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
|
|
|
|
durationNano = jsonItem.DurationNano
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if jsonItem.HasError {
|
|
|
|
|
totalErrorSpans = totalErrorSpans + 1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// collect the intervals for service for execution time calculation
|
|
|
|
|
serviceNameIntervalMap[jsonItem.ServiceName] =
|
|
|
|
|
append(serviceNameIntervalMap[jsonItem.ServiceName], tracedetail.Interval{StartTime: jsonItem.TimeUnixNano, Duration: jsonItem.DurationNano, Service: jsonItem.ServiceName})
|
|
|
|
|
|
|
|
|
|
// append to the span node map
|
|
|
|
|
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
|
|
|
|
|
events = append(events, eventMap)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// traverse through the map and append each node to the children array of the parent node
|
|
|
|
|
// and add the missing spans
|
|
|
|
|
for _, spanNode := range spanIdToSpanNodeMap {
|
|
|
|
|
hasParentSpanNode := false
|
|
|
|
|
for _, reference := range spanNode.References {
|
|
|
|
|
if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
|
|
|
|
|
hasParentSpanNode = true
|
|
|
|
|
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
|
|
|
|
|
|
|
|
|
|
if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
|
|
|
|
|
parentNode.Children = append(parentNode.Children, spanNode)
|
|
|
|
|
} else {
|
|
|
|
|
// insert the missing span
|
|
|
|
|
missingSpan := model.Span{
|
|
|
|
|
SpanID: reference.SpanId,
|
|
|
|
|
TraceID: spanNode.TraceID,
|
|
|
|
|
ServiceName: "",
|
|
|
|
|
Name: "Missing Span",
|
|
|
|
|
TimeUnixNano: spanNode.TimeUnixNano,
|
|
|
|
|
Kind: 0,
|
|
|
|
|
DurationNano: spanNode.DurationNano,
|
|
|
|
|
HasError: false,
|
|
|
|
|
StatusMessage: "",
|
|
|
|
|
StatusCodeString: "",
|
|
|
|
|
SpanKind: "",
|
|
|
|
|
Events: make([]model.Event, 0),
|
|
|
|
|
Children: make([]*model.Span, 0),
|
|
|
|
|
}
|
|
|
|
|
missingSpan.Children = append(missingSpan.Children, spanNode)
|
|
|
|
|
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
|
|
|
|
|
traceRoots = append(traceRoots, &missingSpan)
|
|
|
|
|
hasMissingSpans = true
|
|
|
|
|
jsonItem := model.Span{
|
|
|
|
|
SpanID: item.SpanID,
|
|
|
|
|
TraceID: item.TraceID,
|
|
|
|
|
ServiceName: item.ServiceName,
|
|
|
|
|
Name: item.Name,
|
|
|
|
|
Kind: int32(item.Kind),
|
|
|
|
|
DurationNano: item.DurationNano,
|
|
|
|
|
HasError: item.HasError,
|
|
|
|
|
StatusMessage: item.StatusMessage,
|
|
|
|
|
StatusCodeString: item.StatusCodeString,
|
|
|
|
|
SpanKind: item.SpanKind,
|
|
|
|
|
References: ref,
|
|
|
|
|
Events: events,
|
|
|
|
|
TagMap: item.Attributes_string,
|
|
|
|
|
Children: make([]*model.Span, 0),
|
|
|
|
|
TimeUnixNano: startTimeUnixNano, // Store nanoseconds temporarily
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// metadata calculation
|
|
|
|
|
if startTime == 0 || startTimeUnixNano < startTime {
|
|
|
|
|
startTime = startTimeUnixNano
|
|
|
|
|
}
|
|
|
|
|
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
|
|
|
|
|
endTime = (startTimeUnixNano + jsonItem.DurationNano)
|
|
|
|
|
}
|
|
|
|
|
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
|
|
|
|
durationNano = jsonItem.DurationNano
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if jsonItem.HasError {
|
|
|
|
|
totalErrorSpans = totalErrorSpans + 1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// collect the intervals for service for execution time calculation
|
|
|
|
|
serviceNameIntervalMap[jsonItem.ServiceName] =
|
|
|
|
|
append(serviceNameIntervalMap[jsonItem.ServiceName], tracedetail.Interval{StartTime: jsonItem.TimeUnixNano, Duration: jsonItem.DurationNano, Service: jsonItem.ServiceName})
|
|
|
|
|
|
|
|
|
|
// append to the span node map
|
|
|
|
|
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// traverse through the map and append each node to the children array of the parent node
|
|
|
|
|
// and add the missing spans
|
|
|
|
|
for _, spanNode := range spanIdToSpanNodeMap {
|
|
|
|
|
hasParentSpanNode := false
|
|
|
|
|
for _, reference := range spanNode.References {
|
|
|
|
|
if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
|
|
|
|
|
hasParentSpanNode = true
|
|
|
|
|
|
|
|
|
|
if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
|
|
|
|
|
parentNode.Children = append(parentNode.Children, spanNode)
|
|
|
|
|
} else {
|
|
|
|
|
// insert the missing span
|
|
|
|
|
missingSpan := model.Span{
|
|
|
|
|
SpanID: reference.SpanId,
|
|
|
|
|
TraceID: spanNode.TraceID,
|
|
|
|
|
ServiceName: "",
|
|
|
|
|
Name: "Missing Span",
|
|
|
|
|
TimeUnixNano: spanNode.TimeUnixNano,
|
|
|
|
|
Kind: 0,
|
|
|
|
|
DurationNano: spanNode.DurationNano,
|
|
|
|
|
HasError: false,
|
|
|
|
|
StatusMessage: "",
|
|
|
|
|
StatusCodeString: "",
|
|
|
|
|
SpanKind: "",
|
|
|
|
|
Events: make([]model.Event, 0),
|
|
|
|
|
Children: make([]*model.Span, 0),
|
|
|
|
|
}
|
|
|
|
|
missingSpan.Children = append(missingSpan.Children, spanNode)
|
|
|
|
|
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
|
|
|
|
|
traceRoots = append(traceRoots, &missingSpan)
|
|
|
|
|
hasMissingSpans = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !hasParentSpanNode && !tracedetail.ContainsWaterfallSpan(traceRoots, spanNode) {
|
|
|
|
|
traceRoots = append(traceRoots, spanNode)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sort the trace roots to add missing spans at the right order
|
|
|
|
|
sort.Slice(traceRoots, func(i, j int) bool {
|
|
|
|
|
if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano {
|
|
|
|
|
return traceRoots[i].Name < traceRoots[j].Name
|
|
|
|
|
}
|
|
|
|
|
return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
serviceNameToTotalDurationMap = tracedetail.CalculateServiceTime(serviceNameIntervalMap)
|
|
|
|
|
|
|
|
|
|
traceCache := model.GetWaterfallSpansForTraceWithMetadataCache{
|
|
|
|
|
StartTime: startTime,
|
|
|
|
|
EndTime: endTime,
|
|
|
|
|
DurationNano: durationNano,
|
|
|
|
|
TotalSpans: totalSpans,
|
|
|
|
|
TotalErrorSpans: totalErrorSpans,
|
|
|
|
|
SpanIdToSpanNodeMap: spanIdToSpanNodeMap,
|
|
|
|
|
ServiceNameToTotalDurationMap: serviceNameToTotalDurationMap,
|
|
|
|
|
TraceRoots: traceRoots,
|
|
|
|
|
HasMissingSpans: hasMissingSpans,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.logger.Info("getWaterfallSpansForTraceWithMetadata: processing pre cache", "duration", time.Since(processingBeforeCache), "traceID", traceID)
|
|
|
|
|
cacheErr := r.cacheForTraceDetail.Set(ctx, orgID, strings.Join([]string{"getWaterfallSpansForTraceWithMetadata", traceID}, "-"), &traceCache, time.Minute*5)
|
|
|
|
|
if cacheErr != nil {
|
|
|
|
|
r.logger.Debug("failed to store cache for getWaterfallSpansForTraceWithMetadata", "traceID", traceID, errorsV2.Attr(err))
|
|
|
|
|
if !hasParentSpanNode && !tracedetail.ContainsWaterfallSpan(traceRoots, spanNode) {
|
|
|
|
|
traceRoots = append(traceRoots, spanNode)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sort the trace roots to add missing spans at the right order
|
|
|
|
|
sort.Slice(traceRoots, func(i, j int) bool {
|
|
|
|
|
if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano {
|
|
|
|
|
return traceRoots[i].Name < traceRoots[j].Name
|
|
|
|
|
}
|
|
|
|
|
return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
serviceNameToTotalDurationMap = tracedetail.CalculateServiceTime(serviceNameIntervalMap)
|
|
|
|
|
|
|
|
|
|
processingPostCache := time.Now()
|
|
|
|
|
// When req.Limit is 0 (not set by the client), selectAllSpans is set to false
|
|
|
|
|
// preserving the old paged behaviour for backward compatibility
|
|
|
|
|
@@ -1130,23 +1071,6 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
|
|
|
|
|
return response, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ClickHouseReader) GetFlamegraphSpansForTraceCache(ctx context.Context, orgID valuer.UUID, traceID string) (*model.GetFlamegraphSpansForTraceCache, error) {
|
|
|
|
|
cachedTraceData := new(model.GetFlamegraphSpansForTraceCache)
|
|
|
|
|
err := r.cacheForTraceDetail.Get(ctx, orgID, strings.Join([]string{"getFlamegraphSpansForTrace", traceID}, "-"), cachedTraceData)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Debug("error in retrieving getFlamegraphSpansForTrace cache", errorsV2.Attr(err), "traceID", traceID)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail {
|
|
|
|
|
r.logger.Info("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache", "traceID", traceID)
|
|
|
|
|
return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache, traceID: %s", traceID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.logger.Info("cache is successfully hit, applying cache for getFlamegraphSpansForTrace", "traceID", traceID)
|
|
|
|
|
return cachedTraceData, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, error) {
|
|
|
|
|
trace := new(model.GetFlamegraphSpansForTraceResponse)
|
|
|
|
|
var startTime, endTime, durationNano uint64
|
|
|
|
|
@@ -1155,125 +1079,96 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
|
|
|
|
var selectedSpans = [][]*model.FlamegraphSpan{}
|
|
|
|
|
var traceRoots []*model.FlamegraphSpan
|
|
|
|
|
|
|
|
|
|
// get the trace tree from cache!
|
|
|
|
|
cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, orgID, traceID)
|
|
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
startTime = cachedTraceData.StartTime
|
|
|
|
|
endTime = cachedTraceData.EndTime
|
|
|
|
|
durationNano = cachedTraceData.DurationNano
|
|
|
|
|
selectedSpans = cachedTraceData.SelectedSpans
|
|
|
|
|
traceRoots = cachedTraceData.TraceRoots
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,links as references, resource_string_service$$name, name, events FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Info("cache miss for getFlamegraphSpansForTrace", "traceID", traceID)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if len(searchScanResponses) == 0 {
|
|
|
|
|
return trace, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,links as references, resource_string_service$$name, name, events FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
|
|
|
|
|
for _, item := range searchScanResponses {
|
|
|
|
|
ref := []model.OtelSpanRef{}
|
|
|
|
|
err := json.Unmarshal([]byte(item.References), &ref)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if len(searchScanResponses) == 0 {
|
|
|
|
|
return trace, nil
|
|
|
|
|
r.logger.Error("Error unmarshalling references", errorsV2.Attr(err))
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling references %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
processingBeforeCache := time.Now()
|
|
|
|
|
for _, item := range searchScanResponses {
|
|
|
|
|
ref := []model.OtelSpanRef{}
|
|
|
|
|
err := json.Unmarshal([]byte(item.References), &ref)
|
|
|
|
|
events := make([]model.Event, 0)
|
|
|
|
|
for _, event := range item.Events {
|
|
|
|
|
var eventMap model.Event
|
|
|
|
|
err = json.Unmarshal([]byte(event), &eventMap)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Error("Error unmarshalling references", errorsV2.Attr(err))
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling references %s", err.Error())
|
|
|
|
|
r.logger.Error("Error unmarshalling events", errorsV2.Attr(err))
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling events %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
events := make([]model.Event, 0)
|
|
|
|
|
for _, event := range item.Events {
|
|
|
|
|
var eventMap model.Event
|
|
|
|
|
err = json.Unmarshal([]byte(event), &eventMap)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Error("Error unmarshalling events", errorsV2.Attr(err))
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling events %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
events = append(events, eventMap)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jsonItem := model.FlamegraphSpan{
|
|
|
|
|
SpanID: item.SpanID,
|
|
|
|
|
TraceID: item.TraceID,
|
|
|
|
|
ServiceName: item.ServiceName,
|
|
|
|
|
Name: item.Name,
|
|
|
|
|
DurationNano: item.DurationNano,
|
|
|
|
|
HasError: item.HasError,
|
|
|
|
|
References: ref,
|
|
|
|
|
Events: events,
|
|
|
|
|
Children: make([]*model.FlamegraphSpan, 0),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// metadata calculation
|
|
|
|
|
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
|
|
|
|
|
if startTime == 0 || startTimeUnixNano < startTime {
|
|
|
|
|
startTime = startTimeUnixNano
|
|
|
|
|
}
|
|
|
|
|
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
|
|
|
|
|
endTime = (startTimeUnixNano + jsonItem.DurationNano)
|
|
|
|
|
}
|
|
|
|
|
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
|
|
|
|
durationNano = jsonItem.DurationNano
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
|
|
|
|
|
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
|
|
|
|
|
events = append(events, eventMap)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// traverse through the map and append each node to the children array of the parent node
|
|
|
|
|
// and add missing spans
|
|
|
|
|
for _, spanNode := range spanIdToSpanNodeMap {
|
|
|
|
|
hasParentSpanNode := false
|
|
|
|
|
for _, reference := range spanNode.References {
|
|
|
|
|
if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
|
|
|
|
|
hasParentSpanNode = true
|
|
|
|
|
if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
|
|
|
|
|
parentNode.Children = append(parentNode.Children, spanNode)
|
|
|
|
|
} else {
|
|
|
|
|
// insert the missing spans
|
|
|
|
|
missingSpan := model.FlamegraphSpan{
|
|
|
|
|
SpanID: reference.SpanId,
|
|
|
|
|
TraceID: spanNode.TraceID,
|
|
|
|
|
ServiceName: "",
|
|
|
|
|
Name: "Missing Span",
|
|
|
|
|
TimeUnixNano: spanNode.TimeUnixNano,
|
|
|
|
|
DurationNano: spanNode.DurationNano,
|
|
|
|
|
HasError: false,
|
|
|
|
|
Events: make([]model.Event, 0),
|
|
|
|
|
Children: make([]*model.FlamegraphSpan, 0),
|
|
|
|
|
}
|
|
|
|
|
missingSpan.Children = append(missingSpan.Children, spanNode)
|
|
|
|
|
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
|
|
|
|
|
traceRoots = append(traceRoots, &missingSpan)
|
|
|
|
|
jsonItem := model.FlamegraphSpan{
|
|
|
|
|
SpanID: item.SpanID,
|
|
|
|
|
TraceID: item.TraceID,
|
|
|
|
|
ServiceName: item.ServiceName,
|
|
|
|
|
Name: item.Name,
|
|
|
|
|
DurationNano: item.DurationNano,
|
|
|
|
|
HasError: item.HasError,
|
|
|
|
|
References: ref,
|
|
|
|
|
Events: events,
|
|
|
|
|
Children: make([]*model.FlamegraphSpan, 0),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// metadata calculation
|
|
|
|
|
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
|
|
|
|
|
if startTime == 0 || startTimeUnixNano < startTime {
|
|
|
|
|
startTime = startTimeUnixNano
|
|
|
|
|
}
|
|
|
|
|
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
|
|
|
|
|
endTime = (startTimeUnixNano + jsonItem.DurationNano)
|
|
|
|
|
}
|
|
|
|
|
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
|
|
|
|
durationNano = jsonItem.DurationNano
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
|
|
|
|
|
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// traverse through the map and append each node to the children array of the parent node
|
|
|
|
|
// and add missing spans
|
|
|
|
|
for _, spanNode := range spanIdToSpanNodeMap {
|
|
|
|
|
hasParentSpanNode := false
|
|
|
|
|
for _, reference := range spanNode.References {
|
|
|
|
|
if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
|
|
|
|
|
hasParentSpanNode = true
|
|
|
|
|
if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
|
|
|
|
|
parentNode.Children = append(parentNode.Children, spanNode)
|
|
|
|
|
} else {
|
|
|
|
|
// insert the missing spans
|
|
|
|
|
missingSpan := model.FlamegraphSpan{
|
|
|
|
|
SpanID: reference.SpanId,
|
|
|
|
|
TraceID: spanNode.TraceID,
|
|
|
|
|
ServiceName: "",
|
|
|
|
|
Name: "Missing Span",
|
|
|
|
|
TimeUnixNano: spanNode.TimeUnixNano,
|
|
|
|
|
DurationNano: spanNode.DurationNano,
|
|
|
|
|
HasError: false,
|
|
|
|
|
Events: make([]model.Event, 0),
|
|
|
|
|
Children: make([]*model.FlamegraphSpan, 0),
|
|
|
|
|
}
|
|
|
|
|
missingSpan.Children = append(missingSpan.Children, spanNode)
|
|
|
|
|
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
|
|
|
|
|
traceRoots = append(traceRoots, &missingSpan)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !hasParentSpanNode && !tracedetail.ContainsFlamegraphSpan(traceRoots, spanNode) {
|
|
|
|
|
traceRoots = append(traceRoots, spanNode)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
selectedSpans = tracedetail.GetAllSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap)
|
|
|
|
|
traceCache := model.GetFlamegraphSpansForTraceCache{
|
|
|
|
|
StartTime: startTime,
|
|
|
|
|
EndTime: endTime,
|
|
|
|
|
DurationNano: durationNano,
|
|
|
|
|
SelectedSpans: selectedSpans,
|
|
|
|
|
TraceRoots: traceRoots,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.logger.Info("getFlamegraphSpansForTrace: processing pre cache", "duration", time.Since(processingBeforeCache), "traceID", traceID)
|
|
|
|
|
cacheErr := r.cacheForTraceDetail.Set(ctx, orgID, strings.Join([]string{"getFlamegraphSpansForTrace", traceID}, "-"), &traceCache, time.Minute*5)
|
|
|
|
|
if cacheErr != nil {
|
|
|
|
|
r.logger.Debug("failed to store cache for getFlamegraphSpansForTrace", "traceID", traceID, errorsV2.Attr(err))
|
|
|
|
|
if !hasParentSpanNode && !tracedetail.ContainsFlamegraphSpan(traceRoots, spanNode) {
|
|
|
|
|
traceRoots = append(traceRoots, spanNode)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
selectedSpans = tracedetail.GetAllSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap)
|
|
|
|
|
|
|
|
|
|
processingPostCache := time.Now()
|
|
|
|
|
selectedSpansForRequest := selectedSpans
|
|
|
|
|
clientLimit := min(req.Limit, tracedetail.MaxLimitWithoutSampling)
|
|
|
|
|
|