mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-27 04:10:28 +01:00
Compare commits
40 Commits
nv/dashboa
...
traceop-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65adf8b74e | ||
|
|
2080176766 | ||
|
|
1e326159b0 | ||
|
|
ceb1b4871b | ||
|
|
f3a1f7ba0c | ||
|
|
6b4cc758ec | ||
|
|
63918541d0 | ||
|
|
c1812ebf29 | ||
|
|
331fe3bda8 | ||
|
|
8f1113c528 | ||
|
|
b347ce6a28 | ||
|
|
d57281f0bb | ||
|
|
08008ad813 | ||
|
|
1454a96d4d | ||
|
|
4c02ee28de | ||
|
|
e8befce898 | ||
|
|
ec2bcbcbdc | ||
|
|
370db055b3 | ||
|
|
d197212918 | ||
|
|
96b6d8646f | ||
|
|
0aa6165b18 | ||
|
|
dafa81f3b4 | ||
|
|
a992a13f56 | ||
|
|
79b36abbd7 | ||
|
|
181c307d1a | ||
|
|
becdd4d3b4 | ||
|
|
de0311201a | ||
|
|
1804bfe802 | ||
|
|
357444c94e | ||
|
|
a8598f3bfa | ||
|
|
bca71f9a33 | ||
|
|
c93660357d | ||
|
|
5651e3b7a8 | ||
|
|
cf2cfbc7d4 | ||
|
|
a969c38224 | ||
|
|
b892a0f0a5 | ||
|
|
4d47762eba | ||
|
|
77396a0bb3 | ||
|
|
28c05e1bab | ||
|
|
2b9e383994 |
@@ -18948,6 +18948,77 @@ paths:
|
||||
summary: Get waterfall view for a trace
|
||||
tags:
|
||||
- tracedetail
|
||||
/api/v4/traces/{traceID}/waterfall:
|
||||
post:
|
||||
deprecated: false
|
||||
description: Returns the waterfall view of spans including all spans if total
|
||||
spans are under a limit, a max count otherwise. Aggregations are dropped compared
|
||||
to v3
|
||||
operationId: GetWaterfallV4
|
||||
parameters:
|
||||
- in: path
|
||||
name: traceID
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/SpantypesPostableWaterfall'
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
properties:
|
||||
data:
|
||||
$ref: '#/components/schemas/SpantypesGettableWaterfallTrace'
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
- data
|
||||
type: object
|
||||
description: OK
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Bad Request
|
||||
"401":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Unauthorized
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Forbidden
|
||||
"404":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Not Found
|
||||
"500":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Internal Server Error
|
||||
security:
|
||||
- api_key:
|
||||
- VIEWER
|
||||
- tokenizer:
|
||||
- VIEWER
|
||||
summary: Get waterfall view for a trace
|
||||
tags:
|
||||
- tracedetail
|
||||
/api/v5/query_range:
|
||||
post:
|
||||
deprecated: false
|
||||
|
||||
@@ -9232,6 +9232,17 @@ export type GetWaterfall200 = {
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type GetWaterfallV4PathParameters = {
|
||||
traceID: string;
|
||||
};
|
||||
export type GetWaterfallV4200 = {
|
||||
data: SpantypesGettableWaterfallTraceDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type QueryRangeV5200 = {
|
||||
data: Querybuildertypesv5QueryRangeResponseDTO;
|
||||
/**
|
||||
|
||||
@@ -14,6 +14,8 @@ import type {
|
||||
import type {
|
||||
GetWaterfall200,
|
||||
GetWaterfallPathParameters,
|
||||
GetWaterfallV4200,
|
||||
GetWaterfallV4PathParameters,
|
||||
RenderErrorResponseDTO,
|
||||
SpantypesPostableWaterfallDTO,
|
||||
} from '../sigNoz.schemas';
|
||||
@@ -120,3 +122,102 @@ export const useGetWaterfall = <
|
||||
> => {
|
||||
return useMutation(getGetWaterfallMutationOptions(options));
|
||||
};
|
||||
/**
|
||||
* Returns the waterfall view of spans including all spans if total spans are under a limit, a max count otherwise. Aggregations are dropped compared to v3
|
||||
* @summary Get waterfall view for a trace
|
||||
*/
|
||||
export const getWaterfallV4 = (
|
||||
{ traceID }: GetWaterfallV4PathParameters,
|
||||
spantypesPostableWaterfallDTO?: BodyType<SpantypesPostableWaterfallDTO>,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<GetWaterfallV4200>({
|
||||
url: `/api/v4/traces/${traceID}/waterfall`,
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
data: spantypesPostableWaterfallDTO,
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getGetWaterfallV4MutationOptions = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof getWaterfallV4>>,
|
||||
TError,
|
||||
{
|
||||
pathParams: GetWaterfallV4PathParameters;
|
||||
data?: BodyType<SpantypesPostableWaterfallDTO>;
|
||||
},
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationOptions<
|
||||
Awaited<ReturnType<typeof getWaterfallV4>>,
|
||||
TError,
|
||||
{
|
||||
pathParams: GetWaterfallV4PathParameters;
|
||||
data?: BodyType<SpantypesPostableWaterfallDTO>;
|
||||
},
|
||||
TContext
|
||||
> => {
|
||||
const mutationKey = ['getWaterfallV4'];
|
||||
const { mutation: mutationOptions } = options
|
||||
? options.mutation &&
|
||||
'mutationKey' in options.mutation &&
|
||||
options.mutation.mutationKey
|
||||
? options
|
||||
: { ...options, mutation: { ...options.mutation, mutationKey } }
|
||||
: { mutation: { mutationKey } };
|
||||
|
||||
const mutationFn: MutationFunction<
|
||||
Awaited<ReturnType<typeof getWaterfallV4>>,
|
||||
{
|
||||
pathParams: GetWaterfallV4PathParameters;
|
||||
data?: BodyType<SpantypesPostableWaterfallDTO>;
|
||||
}
|
||||
> = (props) => {
|
||||
const { pathParams, data } = props ?? {};
|
||||
|
||||
return getWaterfallV4(pathParams, data);
|
||||
};
|
||||
|
||||
return { mutationFn, ...mutationOptions };
|
||||
};
|
||||
|
||||
export type GetWaterfallV4MutationResult = NonNullable<
|
||||
Awaited<ReturnType<typeof getWaterfallV4>>
|
||||
>;
|
||||
export type GetWaterfallV4MutationBody =
|
||||
| BodyType<SpantypesPostableWaterfallDTO>
|
||||
| undefined;
|
||||
export type GetWaterfallV4MutationError = ErrorType<RenderErrorResponseDTO>;
|
||||
|
||||
/**
|
||||
* @summary Get waterfall view for a trace
|
||||
*/
|
||||
export const useGetWaterfallV4 = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof getWaterfallV4>>,
|
||||
TError,
|
||||
{
|
||||
pathParams: GetWaterfallV4PathParameters;
|
||||
data?: BodyType<SpantypesPostableWaterfallDTO>;
|
||||
},
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationResult<
|
||||
Awaited<ReturnType<typeof getWaterfallV4>>,
|
||||
TError,
|
||||
{
|
||||
pathParams: GetWaterfallV4PathParameters;
|
||||
data?: BodyType<SpantypesPostableWaterfallDTO>;
|
||||
},
|
||||
TContext
|
||||
> => {
|
||||
return useMutation(getGetWaterfallV4MutationOptions(options));
|
||||
};
|
||||
|
||||
@@ -29,5 +29,24 @@ func (provider *provider) addTraceDetailRoutes(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v4/traces/{traceID}/waterfall", handler.New(
|
||||
provider.authzMiddleware.ViewAccess(provider.traceDetailHandler.GetWaterfallV4),
|
||||
handler.OpenAPIDef{
|
||||
ID: "GetWaterfallV4",
|
||||
Tags: []string{"tracedetail"},
|
||||
Summary: "Get waterfall view for a trace",
|
||||
Description: "Returns the waterfall view of spans including all spans if total spans are under a limit, a max count otherwise. Aggregations are dropped compared to v3",
|
||||
Request: new(spantypes.PostableWaterfall),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(spantypes.GettableWaterfallTrace),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
},
|
||||
)).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -38,3 +38,24 @@ func (h *handler) GetWaterfall(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
render.Success(rw, http.StatusOK, result)
|
||||
}
|
||||
|
||||
func (h *handler) GetWaterfallV4(rw http.ResponseWriter, r *http.Request) {
|
||||
req := new(spantypes.PostableWaterfall)
|
||||
if err := binding.JSON.BindBody(r.Body, req); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := req.Validate(); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.module.GetWaterfallV4(r.Context(), mux.Vars(r)["traceID"], req.SelectedSpanID, req.UncollapsedSpans, req.Limit)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, result)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package impltracedetail
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
|
||||
@@ -45,7 +46,7 @@ func (m *module) GetWaterfall(ctx context.Context, traceID string, req *spantype
|
||||
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, uncollapsedSpans, selectedAllSpans, aggregationResults), nil
|
||||
}
|
||||
|
||||
// getTraceData returns the waterfall cache for the given traceID with fallback on DB.
|
||||
// getTraceData fetches all spans for a trace and builds the WaterfallTrace.
|
||||
func (m *module) getTraceData(ctx context.Context, traceID string) (*spantypes.WaterfallTrace, error) {
|
||||
summary, err := m.store.GetTraceSummary(ctx, traceID)
|
||||
if err != nil {
|
||||
@@ -61,6 +62,86 @@ func (m *module) getTraceData(ctx context.Context, traceID string) (*spantypes.W
|
||||
return nil, spantypes.ErrTraceNotFound
|
||||
}
|
||||
|
||||
traceData := spantypes.NewWaterfallTraceFromSpans(spanItems)
|
||||
return traceData, nil
|
||||
nodes := make([]*spantypes.WaterfallSpan, len(spanItems))
|
||||
for i := range spanItems {
|
||||
nodes[i] = spanItems[i].ToWaterfallSpan(traceID)
|
||||
}
|
||||
return spantypes.NewWaterfallTraceFromSpans(nodes), nil
|
||||
}
|
||||
|
||||
// GetWaterfallV4 is the OOM-safe V4 waterfall.
|
||||
// For large traces (NumSpans > effectiveLimit) it uses a two-step fetch:
|
||||
// minimal fields for all spans to build the tree, then full fields for the
|
||||
// visible window only. Aggregations are not returned.
|
||||
func (m *module) GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error) {
|
||||
summary, err := m.store.GetTraceSummary(ctx, traceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
effectiveLimit := min(selectAllLimit, m.config.Waterfall.MaxLimitToSelectAllSpans)
|
||||
if summary.NumSpans > uint64(effectiveLimit) {
|
||||
return m.getWindowedWaterfall(ctx, traceID, selectedSpanID, uncollapsedSpans, summary.Start, summary.End)
|
||||
}
|
||||
return m.getFullWaterfall(ctx, traceID, summary)
|
||||
}
|
||||
|
||||
func (m *module) getFullWaterfall(ctx context.Context, traceID string, summary *spantypes.TraceSummary) (*spantypes.GettableWaterfallTrace, error) {
|
||||
spanItems, err := m.store.GetTraceSpans(ctx, traceID, summary)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(spanItems) == 0 {
|
||||
return nil, spantypes.ErrTraceNotFound
|
||||
}
|
||||
|
||||
nodes := make([]*spantypes.WaterfallSpan, len(spanItems))
|
||||
for i := range spanItems {
|
||||
nodes[i] = spanItems[i].ToWaterfallSpan(traceID)
|
||||
}
|
||||
waterfallTrace := spantypes.NewWaterfallTraceFromSpans(nodes)
|
||||
selectedSpans := waterfallTrace.GetAllSpans()
|
||||
|
||||
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, nil, true, nil), nil
|
||||
}
|
||||
|
||||
// getWindowedWaterfall builds the waterfall tree with minimal data and then returns only a window of full spans.
|
||||
func (m *module) getWindowedWaterfall(ctx context.Context, traceID, selectedSpanID string, uncollapsedSpans []string, start, end time.Time) (*spantypes.GettableWaterfallTrace, error) {
|
||||
// Step 1: minimal fetch → build full tree → select visible window
|
||||
minimalSpans, err := m.store.GetMinimalSpans(ctx, traceID, start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(minimalSpans) == 0 {
|
||||
return nil, spantypes.ErrTraceNotFound
|
||||
}
|
||||
|
||||
nodes := make([]*spantypes.WaterfallSpan, len(minimalSpans))
|
||||
for i := range minimalSpans {
|
||||
nodes[i] = minimalSpans[i].ToWaterfallSpan(traceID)
|
||||
}
|
||||
waterfallTrace := spantypes.NewWaterfallTraceFromSpans(nodes)
|
||||
|
||||
selectedSpans, uncollapsedSpans := waterfallTrace.GetSelectedSpans(
|
||||
uncollapsedSpans,
|
||||
selectedSpanID,
|
||||
m.config.Waterfall.SpanPageSize,
|
||||
m.config.Waterfall.MaxDepthToAutoExpand,
|
||||
)
|
||||
|
||||
// Step 2: full fetch for the selected window only
|
||||
spanIDs := make([]string, len(selectedSpans))
|
||||
for i, s := range selectedSpans {
|
||||
spanIDs[i] = s.SpanID
|
||||
}
|
||||
fullSpans, err := m.store.GetTraceSpansByIDs(ctx, traceID, start, end, spanIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
spantypes.EnrichSelectedSpans(selectedSpans, fullSpans)
|
||||
|
||||
return spantypes.NewGettableWaterfallTrace(
|
||||
waterfallTrace, selectedSpans, uncollapsedSpans, false, nil,
|
||||
), nil
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
sqlbuilder "github.com/huandu/go-sqlbuilder"
|
||||
|
||||
@@ -12,6 +13,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
)
|
||||
|
||||
const colServiceName = `resource_string_service$$$$name` // $ gets escaped so $$$$ converts to $$.
|
||||
|
||||
type traceStore struct {
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
}
|
||||
@@ -45,8 +48,8 @@ func (s *traceStore) GetTraceSpans(ctx context.Context, traceID string, summary
|
||||
// DISTINCT ON (span_id) is ClickHouse-specific syntax not supported by sqlbuilder
|
||||
query := fmt.Sprintf(`
|
||||
SELECT DISTINCT ON (span_id)
|
||||
timestamp, duration_nano, span_id, trace_id, has_error, kind,
|
||||
resource_string_service$$name, name, links as references,
|
||||
timestamp, duration_nano, span_id, has_error, kind,
|
||||
resource_string_service$$name, name,
|
||||
attributes_string, attributes_number, attributes_bool, resources_string,
|
||||
events, status_message, status_code_string, kind_string, parent_span_id,
|
||||
flags, is_remote, trace_state, status_code,
|
||||
@@ -69,3 +72,64 @@ func (s *traceStore) GetTraceSpans(ctx context.Context, traceID string, summary
|
||||
}
|
||||
return spanItems, nil
|
||||
}
|
||||
|
||||
func (s *traceStore) GetMinimalSpans(ctx context.Context, traceID string, start, end time.Time) ([]spantypes.MinimalSpan, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(
|
||||
"DISTINCT ON (span_id) span_id",
|
||||
"parent_span_id", "timestamp", "duration_nano", "has_error",
|
||||
colServiceName,
|
||||
)
|
||||
sb.From(fmt.Sprintf("%s.%s", spantypes.TraceDB, spantypes.TraceTable))
|
||||
sb.Where(
|
||||
sb.E("trace_id", traceID),
|
||||
sb.GE("ts_bucket_start", start.Unix()-1800),
|
||||
sb.LE("ts_bucket_start", end.Unix()),
|
||||
)
|
||||
sb.OrderByAsc("timestamp")
|
||||
sb.OrderByAsc("name")
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var spans []spantypes.MinimalSpan
|
||||
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &spans, query, args...); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying minimal spans")
|
||||
}
|
||||
return spans, nil
|
||||
}
|
||||
|
||||
func (s *traceStore) GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]spantypes.StorableSpan, error) {
|
||||
if len(spanIDs) == 0 {
|
||||
return []spantypes.StorableSpan{}, nil
|
||||
}
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(
|
||||
"DISTINCT ON (span_id) timestamp",
|
||||
"duration_nano", "span_id", "has_error", "kind",
|
||||
colServiceName, "name",
|
||||
"attributes_string", "attributes_number", "attributes_bool", "resources_string",
|
||||
"events", "status_message", "status_code_string", "kind_string", "parent_span_id",
|
||||
"flags", "is_remote", "trace_state", "status_code",
|
||||
"db_name", "db_operation", "http_method", "http_url", "http_host",
|
||||
"external_http_method", "external_http_url", "response_status_code",
|
||||
)
|
||||
sb.From(fmt.Sprintf("%s.%s", spantypes.TraceDB, spantypes.TraceTable))
|
||||
ids := make([]any, len(spanIDs))
|
||||
for i, id := range spanIDs {
|
||||
ids[i] = id
|
||||
}
|
||||
sb.Where(
|
||||
sb.E("trace_id", traceID),
|
||||
sb.In("span_id", ids...),
|
||||
sb.GE("ts_bucket_start", start.Unix()-1800),
|
||||
sb.LE("ts_bucket_start", end.Unix()),
|
||||
)
|
||||
sb.OrderByAsc("timestamp")
|
||||
sb.OrderByAsc("name")
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var spans []spantypes.StorableSpan
|
||||
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &spans, query, args...); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying trace spans by IDs")
|
||||
}
|
||||
return spans, nil
|
||||
}
|
||||
|
||||
@@ -10,9 +10,11 @@ import (
|
||||
// Handler exposes HTTP handlers for trace detail APIs.
|
||||
type Handler interface {
|
||||
GetWaterfall(http.ResponseWriter, *http.Request)
|
||||
GetWaterfallV4(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
// Module defines the business logic for trace detail operations.
|
||||
type Module interface {
|
||||
GetWaterfall(ctx context.Context, traceID string, req *spantypes.PostableWaterfall) (*spantypes.GettableWaterfallTrace, error)
|
||||
GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error)
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
const traceOutsideRangeWarn = "Query %s references a trace_id that exists between %s and %s (UTC) but lies outside the selected time range; adjust the time range to see results"
|
||||
|
||||
type builderQuery[T any] struct {
|
||||
logger *slog.Logger
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
@@ -199,7 +201,21 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
|
||||
return q.executeWindowList(ctx)
|
||||
}
|
||||
|
||||
stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec, q.variables)
|
||||
fromMS, toMS := q.fromMS, q.toMS
|
||||
if q.spec.Signal == telemetrytypes.SignalTraces || q.spec.Signal == telemetrytypes.SignalLogs {
|
||||
var overlap bool
|
||||
var warning string
|
||||
fromMS, toMS, overlap, warning = q.narrowWindowByTraceID(ctx, fromMS, toMS)
|
||||
if !overlap {
|
||||
res := emptyResultFor(q.kind, q.spec.Name)
|
||||
if warning != "" {
|
||||
res.Warnings = []string{warning}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
|
||||
stmt, err := q.stmtBuilder.Build(ctx, fromMS, toMS, q.kind, q.spec, q.variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -215,6 +231,88 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// narrowWindowByTraceID inspects the filter for trace_id predicates and clamps
|
||||
// [fromMS,toMS] to the time range stored in signoz_traces.distributed_trace_summary.
|
||||
// Returns the (possibly narrowed) window, overlap=false when the trace lies
|
||||
// completely outside the query window (callers should short-circuit), and a
|
||||
// warning string the caller should attach to the empty result when the trace
|
||||
// exists but is outside the selected window.
|
||||
//
|
||||
// When the trace_id is not present in trace_summary the behaviour differs by
|
||||
// signal:
|
||||
// - traces: trace_summary is derived from the spans table, so a missing row
|
||||
// means no spans exist for that trace_id; we short-circuit to empty.
|
||||
// - logs: logs can carry a trace_id even when traces are not ingested at all
|
||||
// (e.g. traces disabled). We must not short-circuit; instead leave the
|
||||
// window untouched and let the query run.
|
||||
func (q *builderQuery[T]) narrowWindowByTraceID(ctx context.Context, fromMS, toMS uint64) (uint64, uint64, bool, string) {
|
||||
if q.spec.Filter == nil || q.spec.Filter.Expression == "" {
|
||||
return fromMS, toMS, true, ""
|
||||
}
|
||||
|
||||
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
|
||||
if !found || len(traceIDs) == 0 {
|
||||
return fromMS, toMS, true, ""
|
||||
}
|
||||
|
||||
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
|
||||
traceStart, traceEnd, exists, err := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
|
||||
if err != nil {
|
||||
return fromMS, toMS, true, ""
|
||||
}
|
||||
if !exists {
|
||||
if q.spec.Signal == telemetrytypes.SignalTraces {
|
||||
q.logger.DebugContext(ctx, "trace_id not found in trace_summary; short-circuiting traces query to empty",
|
||||
slog.Any("trace_ids", traceIDs))
|
||||
return fromMS, toMS, false, ""
|
||||
}
|
||||
q.logger.DebugContext(ctx, "trace_id not found in trace_summary; leaving time range untouched for logs",
|
||||
slog.Any("trace_ids", traceIDs))
|
||||
return fromMS, toMS, true, ""
|
||||
}
|
||||
|
||||
traceStartMS := uint64(traceStart) / 1_000_000
|
||||
traceEndMS := uint64(traceEnd) / 1_000_000
|
||||
if traceStartMS == 0 || traceEndMS == 0 {
|
||||
return fromMS, toMS, true, ""
|
||||
}
|
||||
|
||||
if traceStartMS > toMS || traceEndMS < fromMS {
|
||||
traceStartUTC := time.UnixMilli(int64(traceStartMS)).UTC().Format(time.RFC3339)
|
||||
traceEndUTC := time.UnixMilli(int64(traceEndMS)).UTC().Format(time.RFC3339)
|
||||
return fromMS, toMS, false, fmt.Sprintf(traceOutsideRangeWarn, q.spec.Name, traceStartUTC, traceEndUTC)
|
||||
}
|
||||
if traceStartMS > fromMS {
|
||||
fromMS = traceStartMS
|
||||
}
|
||||
if traceEndMS < toMS {
|
||||
toMS = traceEndMS
|
||||
}
|
||||
q.logger.DebugContext(ctx, "optimized time range using trace_id lookup",
|
||||
slog.String("signal", q.spec.Signal.StringValue()),
|
||||
slog.Any("trace_ids", traceIDs),
|
||||
slog.Uint64("start", fromMS),
|
||||
slog.Uint64("end", toMS))
|
||||
return fromMS, toMS, true, ""
|
||||
}
|
||||
|
||||
// emptyResultFor returns an empty result payload appropriate for the given kind.
|
||||
func emptyResultFor(kind qbtypes.RequestType, queryName string) *qbtypes.Result {
|
||||
var value any
|
||||
switch kind {
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
value = &qbtypes.TimeSeriesData{QueryName: queryName}
|
||||
case qbtypes.RequestTypeScalar:
|
||||
value = &qbtypes.ScalarData{QueryName: queryName}
|
||||
default:
|
||||
value = &qbtypes.RawData{QueryName: queryName}
|
||||
}
|
||||
return &qbtypes.Result{
|
||||
Type: kind,
|
||||
Value: value,
|
||||
}
|
||||
}
|
||||
|
||||
// executeWithContext executes the query with query window and step context for partial value detection.
|
||||
func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
@@ -310,42 +408,27 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
|
||||
totalBytes := uint64(0)
|
||||
start := time.Now()
|
||||
|
||||
// Check if filter contains trace_id(s) and optimize time range if needed
|
||||
if q.spec.Signal == telemetrytypes.SignalTraces &&
|
||||
q.spec.Filter != nil && q.spec.Filter.Expression != "" {
|
||||
|
||||
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
|
||||
if found && len(traceIDs) > 0 {
|
||||
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
|
||||
|
||||
traceStart, traceEnd, ok := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
|
||||
traceStartMS := uint64(traceStart) / 1_000_000
|
||||
traceEndMS := uint64(traceEnd) / 1_000_000
|
||||
if !ok {
|
||||
q.logger.DebugContext(ctx, "failed to get trace time range", slog.Any("trace_ids", traceIDs))
|
||||
} else if traceStartMS > 0 && traceEndMS > 0 {
|
||||
// no overlap — nothing to return
|
||||
if uint64(traceStartMS) > toMS || uint64(traceEndMS) < fromMS {
|
||||
return &qbtypes.Result{
|
||||
Type: qbtypes.RequestTypeRaw,
|
||||
Value: &qbtypes.RawData{
|
||||
QueryName: q.spec.Name,
|
||||
},
|
||||
Stats: qbtypes.ExecStats{
|
||||
DurationMS: uint64(time.Since(start).Milliseconds()),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// clamp window to trace time range before bucketing
|
||||
if uint64(traceStartMS) > fromMS {
|
||||
fromMS = uint64(traceStartMS)
|
||||
}
|
||||
if uint64(traceEndMS) < toMS {
|
||||
toMS = uint64(traceEndMS)
|
||||
}
|
||||
q.logger.DebugContext(ctx, "optimized time range for traces", slog.Any("trace_ids", traceIDs), slog.Uint64("start", fromMS), slog.Uint64("end", toMS))
|
||||
// Check if filter contains trace_id(s) and optimize time range if needed.
|
||||
// Applies to both traces (the listing this branch was built for) and logs
|
||||
// (which carry trace_id and benefit from the same clamp before bucketing).
|
||||
if q.spec.Signal == telemetrytypes.SignalTraces || q.spec.Signal == telemetrytypes.SignalLogs {
|
||||
var overlap bool
|
||||
var warning string
|
||||
fromMS, toMS, overlap, warning = q.narrowWindowByTraceID(ctx, fromMS, toMS)
|
||||
if !overlap {
|
||||
res := &qbtypes.Result{
|
||||
Type: qbtypes.RequestTypeRaw,
|
||||
Value: &qbtypes.RawData{
|
||||
QueryName: q.spec.Name,
|
||||
},
|
||||
Stats: qbtypes.ExecStats{
|
||||
DurationMS: uint64(time.Since(start).Milliseconds()),
|
||||
},
|
||||
}
|
||||
if warning != "" {
|
||||
res.Warnings = []string{warning}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -70,12 +70,31 @@ func (b *traceOperatorCTEBuilder) build(ctx context.Context, requestType qbtypes
|
||||
|
||||
selectFromCTE := rootCTEName
|
||||
if b.operator.ReturnSpansFrom != "" {
|
||||
selectFromCTE = b.queryToCTEName[b.operator.ReturnSpansFrom]
|
||||
if selectFromCTE == "" {
|
||||
sourceQueryCTE := b.queryToCTEName[b.operator.ReturnSpansFrom]
|
||||
if sourceQueryCTE == "" {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"returnSpansFrom references query '%s' which has no corresponding CTE",
|
||||
b.operator.ReturnSpansFrom)
|
||||
}
|
||||
filteredCTEName := fmt.Sprintf("__return_from_%s", b.operator.ReturnSpansFrom)
|
||||
|
||||
// rootCTEName holds one row per matching *span*, not per *trace*, so it can
|
||||
// contain many rows for the same trace_id. DISTINCT de-duplicates that set
|
||||
// before ClickHouse builds the hash table for the IN check, keeping memory
|
||||
// usage proportional to the number of distinct traces rather than spans.
|
||||
matchingTracedSB := sqlbuilder.NewSelectBuilder()
|
||||
matchingTracedSB.Select("DISTINCT trace_id")
|
||||
matchingTracedSB.From(rootCTEName)
|
||||
matchedTracesSQL, matchedTracesArgs := matchingTracedSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
filteredSB := sqlbuilder.NewSelectBuilder()
|
||||
filteredSB.Select("*")
|
||||
filteredSB.From(sourceQueryCTE)
|
||||
filteredSB.Where(fmt.Sprintf("trace_id IN (%s)", matchedTracesSQL))
|
||||
filteredSQL, filteredArgs := filteredSB.BuildWithFlavor(sqlbuilder.ClickHouse, matchedTracesArgs...)
|
||||
|
||||
b.addCTE(filteredCTEName, filteredSQL, filteredArgs, []string{sourceQueryCTE, rootCTEName})
|
||||
selectFromCTE = filteredCTEName
|
||||
}
|
||||
|
||||
finalStmt, err := b.buildFinalQuery(ctx, selectFromCTE, requestType)
|
||||
|
||||
@@ -385,6 +385,82 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "returnSpansFrom B: A -> B return B spans filtered by operator",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
operator: qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "A -> B",
|
||||
ReturnSpansFrom: "B",
|
||||
Limit: 10,
|
||||
},
|
||||
compositeQuery: &qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "B",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __return_from_B AS (SELECT * FROM B WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "returnSpansFrom C: (A -> B) && C return C spans filtered by operator",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
operator: qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "(A -> B) && C",
|
||||
ReturnSpansFrom: "C",
|
||||
Limit: 10,
|
||||
},
|
||||
compositeQuery: &qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "B",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "C",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'auth'"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B_AND_C AS (SELECT l.* FROM A_INDIR_DESC_B AS l INNER JOIN C AS r ON l.trace_id = r.trace_id), __return_from_C AS (SELECT * FROM C WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B_AND_C)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_C ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
|
||||
@@ -21,19 +21,19 @@ func NewTraceTimeRangeFinder(telemetryStore telemetrystore.TelemetryStore) *Trac
|
||||
}
|
||||
}
|
||||
|
||||
func (f *TraceTimeRangeFinder) GetTraceTimeRange(ctx context.Context, traceID string) (startNano, endNano int64, ok bool) {
|
||||
func (f *TraceTimeRangeFinder) GetTraceTimeRange(ctx context.Context, traceID string) (startNano, endNano int64, exists bool, error error) {
|
||||
traceIDs := []string{traceID}
|
||||
return f.GetTraceTimeRangeMulti(ctx, traceIDs)
|
||||
}
|
||||
|
||||
func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, traceIDs []string) (startNano, endNano int64, ok bool) {
|
||||
func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, traceIDs []string) (startNano, endNano int64, exists bool, error error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalTraces.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "trace-time-range",
|
||||
instrumentationtypes.CodeFunctionName: "GetTraceTimeRangeMulti",
|
||||
})
|
||||
if len(traceIDs) == 0 {
|
||||
return 0, 0, false
|
||||
return 0, 0, false, nil
|
||||
}
|
||||
|
||||
cleanedIDs := make([]string, len(traceIDs))
|
||||
@@ -49,7 +49,8 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
SELECT
|
||||
count(),
|
||||
toUnixTimestamp64Nano(min(start)),
|
||||
toUnixTimestamp64Nano(max(end))
|
||||
FROM %s.%s
|
||||
@@ -58,9 +59,14 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
|
||||
|
||||
row := f.telemetryStore.ClickhouseDB().QueryRow(ctx, query, args...)
|
||||
|
||||
err := row.Scan(&startNano, &endNano)
|
||||
var rowCount uint64
|
||||
err := row.Scan(&rowCount, &startNano, &endNano)
|
||||
if err != nil {
|
||||
return 0, 0, false
|
||||
return 0, 0, false, err
|
||||
}
|
||||
|
||||
if rowCount == 0 {
|
||||
return 0, 0, false, nil
|
||||
}
|
||||
|
||||
if startNano > 1_000_000_000 {
|
||||
@@ -68,5 +74,5 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
|
||||
}
|
||||
endNano += 1_000_000_000
|
||||
|
||||
return startNano, endNano, true
|
||||
return startNano, endNano, true, nil
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ func TestGetTraceTimeRangeMulti(t *testing.T) {
|
||||
finder := &TraceTimeRangeFinder{telemetryStore: nil}
|
||||
|
||||
if !tt.expectOK {
|
||||
_, _, ok := finder.GetTraceTimeRangeMulti(ctx, tt.traceIDs)
|
||||
_, _, ok, _ := finder.GetTraceTimeRangeMulti(ctx, tt.traceIDs)
|
||||
assert.False(t, ok)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -2,6 +2,7 @@ package spantypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
@@ -26,4 +27,6 @@ type SpanMapperStore interface {
|
||||
type TraceStore interface {
|
||||
GetTraceSummary(ctx context.Context, traceID string) (*TraceSummary, error)
|
||||
GetTraceSpans(ctx context.Context, traceID string, summary *TraceSummary) ([]StorableSpan, error)
|
||||
GetMinimalSpans(ctx context.Context, traceID string, start, end time.Time) ([]MinimalSpan, error)
|
||||
GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]StorableSpan, error)
|
||||
}
|
||||
|
||||
@@ -103,12 +103,10 @@ type StorableSpan struct {
|
||||
StartTime time.Time `ch:"timestamp"`
|
||||
DurationNano uint64 `ch:"duration_nano"`
|
||||
SpanID string `ch:"span_id"`
|
||||
TraceID string `ch:"trace_id"`
|
||||
HasError bool `ch:"has_error"`
|
||||
Kind int8 `ch:"kind"`
|
||||
ServiceName string `ch:"resource_string_service$$name"`
|
||||
Name string `ch:"name"`
|
||||
References string `ch:"references"`
|
||||
AttributesString map[string]string `ch:"attributes_string"`
|
||||
AttributesNumber map[string]float64 `ch:"attributes_number"`
|
||||
AttributesBool map[string]bool `ch:"attributes_bool"`
|
||||
@@ -132,6 +130,32 @@ type StorableSpan struct {
|
||||
ResponseStatusCode string `ch:"response_status_code"`
|
||||
}
|
||||
|
||||
// MinimalSpan with only the fields needed to build the parent-child tree.
|
||||
type MinimalSpan struct {
|
||||
SpanID string `ch:"span_id"`
|
||||
ParentSpanID string `ch:"parent_span_id"`
|
||||
StartTime time.Time `ch:"timestamp"`
|
||||
DurationNano uint64 `ch:"duration_nano"`
|
||||
HasError bool `ch:"has_error"`
|
||||
ServiceName string `ch:"resource_string_service$$name"`
|
||||
}
|
||||
|
||||
func (item *MinimalSpan) ToWaterfallSpan(traceID string) *WaterfallSpan {
|
||||
return &WaterfallSpan{
|
||||
SpanID: item.SpanID,
|
||||
TraceID: traceID,
|
||||
ParentSpanID: item.ParentSpanID,
|
||||
TimeUnix: uint64(item.StartTime.UnixNano()),
|
||||
DurationNano: item.DurationNano,
|
||||
HasError: item.HasError,
|
||||
ServiceName: item.ServiceName,
|
||||
Resource: map[string]string{"service.name": item.ServiceName},
|
||||
Children: make([]*WaterfallSpan, 0),
|
||||
Attributes: make(map[string]any),
|
||||
Events: make([]Event, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// NewMissingWaterfallSpan creates a synthetic placeholder span for a parent that has no recorded data.
|
||||
func NewMissingWaterfallSpan(spanID, traceID string, timeUnixNano, durationNano uint64) *WaterfallSpan {
|
||||
return &WaterfallSpan{
|
||||
@@ -261,7 +285,7 @@ func (item *StorableSpan) UnmarshalledEvents() []Event {
|
||||
return events
|
||||
}
|
||||
|
||||
func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
|
||||
func (item *StorableSpan) ToWaterfallSpan(traceID string) *WaterfallSpan {
|
||||
resources := make(map[string]string)
|
||||
maps.Copy(resources, item.ResourcesString)
|
||||
|
||||
@@ -289,7 +313,7 @@ func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
|
||||
StatusCode: item.StatusCode,
|
||||
StatusCodeString: item.StatusCodeString,
|
||||
StatusMessage: item.StatusMessage,
|
||||
TraceID: item.TraceID,
|
||||
TraceID: traceID,
|
||||
TraceState: item.TraceState,
|
||||
Children: make([]*WaterfallSpan, 0),
|
||||
TimeUnix: uint64(item.StartTime.UnixNano()),
|
||||
@@ -297,6 +321,24 @@ func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
|
||||
}
|
||||
}
|
||||
|
||||
func EnrichSelectedSpans(window []*WaterfallSpan, fullSpans []StorableSpan) {
|
||||
fullByID := make(map[string]*StorableSpan, len(fullSpans))
|
||||
for i := range fullSpans {
|
||||
fullByID[fullSpans[i].SpanID] = &fullSpans[i]
|
||||
}
|
||||
for i, ws := range window {
|
||||
full, ok := fullByID[ws.SpanID]
|
||||
if !ok {
|
||||
continue // synthesized MissingSpan — keep empty shell
|
||||
}
|
||||
newWS := full.ToWaterfallSpan(ws.TraceID)
|
||||
newWS.Level = ws.Level
|
||||
newWS.HasChildren = ws.HasChildren
|
||||
newWS.SubTreeNodeCount = ws.SubTreeNodeCount
|
||||
window[i] = newWS
|
||||
}
|
||||
}
|
||||
|
||||
// getSpanIndex returns the index of matched span and -1 for no match.
|
||||
func getSpanIndex(spans []*WaterfallSpan, targetSpanID string) int {
|
||||
for i, s := range spans {
|
||||
|
||||
@@ -62,26 +62,24 @@ func NewWaterfallTrace(
|
||||
}
|
||||
}
|
||||
|
||||
func NewWaterfallTraceFromSpans(spans []StorableSpan) *WaterfallTrace {
|
||||
// NewWaterfallTraceFromSpans requires WaterfallSpan nodes with only below fields:
|
||||
// SpanID, ParentSpanID, TimeUnix, DurationNano, HasError, and ServiceName.
|
||||
func NewWaterfallTraceFromSpans(nodes []*WaterfallSpan) *WaterfallTrace {
|
||||
var (
|
||||
startTime, endTime, totalErrorSpans uint64
|
||||
spanIDToSpanNodeMap = make(map[string]*WaterfallSpan, len(spans))
|
||||
spanIDToSpanNodeMap = make(map[string]*WaterfallSpan, len(nodes))
|
||||
traceRoots []*WaterfallSpan
|
||||
hasMissingSpans bool
|
||||
)
|
||||
|
||||
for _, item := range spans {
|
||||
span := item.ToWaterfallSpan()
|
||||
startTimeUnixNano := uint64(item.StartTime.UnixNano())
|
||||
if startTime == 0 || startTimeUnixNano < startTime {
|
||||
startTime = startTimeUnixNano
|
||||
for _, span := range nodes {
|
||||
if startTime == 0 || span.TimeUnix < startTime {
|
||||
startTime = span.TimeUnix
|
||||
}
|
||||
endTime = max(endTime, startTimeUnixNano+span.DurationNano)
|
||||
|
||||
endTime = max(endTime, span.TimeUnix+span.DurationNano)
|
||||
if span.HasError {
|
||||
totalErrorSpans++
|
||||
}
|
||||
|
||||
spanIDToSpanNodeMap[span.SpanID] = span
|
||||
}
|
||||
|
||||
@@ -116,7 +114,7 @@ func NewWaterfallTraceFromSpans(spans []StorableSpan) *WaterfallTrace {
|
||||
return NewWaterfallTrace(
|
||||
startTime,
|
||||
endTime,
|
||||
uint64(len(spans)),
|
||||
uint64(len(nodes)),
|
||||
totalErrorSpans,
|
||||
spanIDToSpanNodeMap,
|
||||
traceRoots,
|
||||
|
||||
3
tests/fixtures/querier.py
vendored
3
tests/fixtures/querier.py
vendored
@@ -72,6 +72,7 @@ class TraceOperatorQuery:
|
||||
return_spans_from: str | None = None
|
||||
limit: int | None = None
|
||||
order: list[OrderBy] | None = None
|
||||
select_fields: list[TelemetryFieldKey] | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
spec: dict[str, Any] = {
|
||||
@@ -84,6 +85,8 @@ class TraceOperatorQuery:
|
||||
spec["limit"] = self.limit
|
||||
if self.order:
|
||||
spec["order"] = [o.to_dict() if hasattr(o, "to_dict") else o for o in self.order]
|
||||
if self.select_fields:
|
||||
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
|
||||
return {"type": "builder_trace_operator", "spec": spec}
|
||||
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ from fixtures.querier import (
|
||||
index_series_by_label,
|
||||
make_query_request,
|
||||
)
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
|
||||
|
||||
def test_logs_list(
|
||||
@@ -2293,3 +2294,334 @@ def test_logs_formula_orderby_and_limit(
|
||||
assert len(f3_services) == 3, f"F3: expected 3 rows after limit, got {len(f3_services)}"
|
||||
assert f3_values == f4_values[:3], f"F3 values {f3_values} do not match F4[:3] values {f4_values[:3]}"
|
||||
assert set(f3_services) == set(f4_services[:3]), f"F3 services {f3_services} do not match F4[:3] services {f4_services[:3]}"
|
||||
|
||||
|
||||
def test_logs_list_filter_by_trace_id(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[list[Logs]], None],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Tests that filtering logs by trace_id uses the trace_summary lookup to
|
||||
narrow the query window before scanning the logs table:
|
||||
1. Returns the matching log (narrow window, single bucket).
|
||||
2. Does not return duplicate logs when the query window should span multiple
|
||||
exponential buckets (>1 h). But is clamped to the timerange of trace.
|
||||
3. Returns no results when the query window does not contain the trace.
|
||||
4. Logs carrying a trace_id whose trace is NOT in trace_summary (e.g.
|
||||
traces disabled) are still returned — the lookup miss must not
|
||||
short-circuit logs queries.
|
||||
"""
|
||||
target_trace_id = TraceIdGenerator.trace_id()
|
||||
orphan_trace_id = TraceIdGenerator.trace_id()
|
||||
target_root_span_id = TraceIdGenerator.span_id()
|
||||
target_child_span_id = TraceIdGenerator.span_id()
|
||||
orphan_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
common_resources = {
|
||||
"deployment.environment": "production",
|
||||
"service.name": "logs-trace-filter-service",
|
||||
"cloud.provider": "integration",
|
||||
}
|
||||
|
||||
# Populate signoz_traces.distributed_trace_summary by inserting spans for
|
||||
# the target trace_id. trace_summary records min/max of span timestamps
|
||||
# (it ignores span duration), so two spans are inserted to give the trace
|
||||
# a non-trivial recorded window of [now-10s, now-5s].
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_root_span_id,
|
||||
parent_span_id="",
|
||||
name="root-span",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_child_span_id,
|
||||
parent_span_id=target_root_span_id,
|
||||
name="child-span",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
# Insert logs:
|
||||
# - one with the target trace_id, at a timestamp within the trace's
|
||||
# recorded window (now-10s..now-5s, padded ±1s).
|
||||
# - one with an orphan trace_id whose trace was never ingested — used to
|
||||
# verify the lookup miss does NOT short-circuit logs queries.
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=7),
|
||||
resources=common_resources,
|
||||
attributes={"http.method": "GET"},
|
||||
body="log inside the target trace window",
|
||||
severity_text="INFO",
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_root_span_id,
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
resources=common_resources,
|
||||
attributes={"http.method": "PUT"},
|
||||
body="log with a trace_id absent from trace_summary",
|
||||
severity_text="INFO",
|
||||
trace_id=orphan_trace_id,
|
||||
span_id=orphan_span_id,
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
def _query(start_ms: int, end_ms: int, trace_id: str) -> tuple[list, list[str]]:
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=start_ms,
|
||||
end_ms=end_ms,
|
||||
request_type="raw",
|
||||
queries=[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "logs",
|
||||
"disabled": False,
|
||||
"limit": 100,
|
||||
"offset": 0,
|
||||
"filter": {"expression": f"trace_id = '{trace_id}'"},
|
||||
"order": [
|
||||
{"key": {"name": "timestamp"}, "direction": "desc"},
|
||||
{"key": {"name": "id"}, "direction": "desc"},
|
||||
],
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
|
||||
warning = (response.json().get("data") or {}).get("warning") or {}
|
||||
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
|
||||
return rows, messages
|
||||
|
||||
outside_range_msg = "lies outside the selected time range"
|
||||
|
||||
now_ms = int(now.timestamp() * 1000)
|
||||
|
||||
# --- Test 1: narrow window (single bucket, <1 h) ---
|
||||
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
narrow_rows, narrow_warnings = _query(narrow_start_ms, now_ms, target_trace_id)
|
||||
|
||||
assert len(narrow_rows) == 1, f"Expected 1 log for trace_id filter (narrow window), got {len(narrow_rows)}"
|
||||
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
|
||||
assert narrow_rows[0]["data"]["span_id"] == target_root_span_id
|
||||
assert not any(outside_range_msg in m for m in narrow_warnings), f"Did not expect outside-range warning, got {narrow_warnings}"
|
||||
|
||||
# --- Test 2: wide window (>1 h, clamp to the timerange from trace_summary) ---
|
||||
# Should still return exactly one log — no duplicates from multi-bucket scan.
|
||||
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
|
||||
wide_rows, wide_warnings = _query(wide_start_ms, now_ms, target_trace_id)
|
||||
|
||||
assert len(wide_rows) == 1, f"Expected 1 log for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-log regression"
|
||||
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
|
||||
assert wide_rows[0]["data"]["span_id"] == target_root_span_id
|
||||
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
|
||||
|
||||
# --- Test 3: window that does not contain the trace returns no results + warning ---
|
||||
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
|
||||
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
|
||||
past_rows, past_warnings = _query(past_start_ms, past_end_ms, target_trace_id)
|
||||
|
||||
assert len(past_rows) == 0, f"Expected 0 logs for trace_id filter outside time window, got {len(past_rows)}"
|
||||
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
|
||||
|
||||
# --- Test 4: trace_id not present in trace_summary still returns logs (no warning) ---
|
||||
orphan_rows, orphan_warnings = _query(narrow_start_ms, now_ms, orphan_trace_id)
|
||||
|
||||
assert len(orphan_rows) == 1, f"Expected 1 log for orphan trace_id (no trace_summary entry), got {len(orphan_rows)} — logs query may have been incorrectly short-circuited"
|
||||
assert orphan_rows[0]["data"]["trace_id"] == orphan_trace_id
|
||||
assert not any(outside_range_msg in m for m in orphan_warnings), f"Did not expect outside-range warning for orphan trace_id, got {orphan_warnings}"
|
||||
|
||||
|
||||
def test_logs_aggregation_filter_by_trace_id(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[list[Logs]], None],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Tests that the trace_id time-range optimization also applies to
|
||||
non-window-list (time_series / aggregation) logs queries:
|
||||
1. Wide query window containing the trace returns the correct count.
|
||||
2. Query window outside the trace's time range short-circuits to an
|
||||
empty result.
|
||||
3. A trace_id with no row in trace_summary (e.g. traces disabled) still
|
||||
returns the matching logs — the lookup miss must not short-circuit
|
||||
logs aggregation queries.
|
||||
"""
|
||||
target_trace_id = TraceIdGenerator.trace_id()
|
||||
orphan_trace_id = TraceIdGenerator.trace_id()
|
||||
target_root_span_id = TraceIdGenerator.span_id()
|
||||
target_child_span_id = TraceIdGenerator.span_id()
|
||||
orphan_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
common_resources = {
|
||||
"deployment.environment": "production",
|
||||
"service.name": "logs-trace-agg-service",
|
||||
"cloud.provider": "integration",
|
||||
}
|
||||
|
||||
# trace_summary records min/max of span timestamps (it ignores duration),
|
||||
# so insert two spans to give the trace a recorded window wide enough to
|
||||
# comfortably contain the log timestamps below.
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_root_span_id,
|
||||
parent_span_id="",
|
||||
name="root-span",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_child_span_id,
|
||||
parent_span_id=target_root_span_id,
|
||||
name="child-span",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
# Two logs for the target trace_id, both inside the recorded trace window.
|
||||
# One additional log carries an orphan trace_id with no row in
|
||||
# trace_summary — used to verify that the lookup miss does not
|
||||
# short-circuit logs aggregations.
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=7),
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
body="log A inside trace window",
|
||||
severity_text="INFO",
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_root_span_id,
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=6),
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
body="log B inside trace window",
|
||||
severity_text="INFO",
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_root_span_id,
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
body="log with a trace_id absent from trace_summary",
|
||||
severity_text="INFO",
|
||||
trace_id=orphan_trace_id,
|
||||
span_id=orphan_span_id,
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
def _count(start_ms: int, end_ms: int, trace_id: str) -> tuple[float, list[str]]:
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=start_ms,
|
||||
end_ms=end_ms,
|
||||
request_type="time_series",
|
||||
queries=[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "logs",
|
||||
"stepInterval": 60,
|
||||
"disabled": False,
|
||||
"filter": {"expression": f"trace_id = '{trace_id}'"},
|
||||
"having": {"expression": ""},
|
||||
"aggregations": [{"expression": "count()"}],
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
results = response.json()["data"]["data"]["results"]
|
||||
assert len(results) == 1
|
||||
warning = (response.json().get("data") or {}).get("warning") or {}
|
||||
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
|
||||
aggregations = results[0].get("aggregations") or []
|
||||
if not aggregations:
|
||||
return 0, messages
|
||||
series = aggregations[0].get("series") or []
|
||||
if not series:
|
||||
return 0, messages
|
||||
return sum(v["value"] for v in series[0]["values"]), messages
|
||||
|
||||
outside_range_msg = "lies outside the selected time range"
|
||||
|
||||
now_ms = int(now.timestamp() * 1000)
|
||||
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
|
||||
# --- Test 1: wide window (>1 h) containing the trace returns 2 logs ---
|
||||
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
|
||||
wide_count, wide_warnings = _count(wide_start_ms, now_ms, target_trace_id)
|
||||
assert wide_count == 2, f"Expected count=2 for trace_id aggregation (wide window), got {wide_count}"
|
||||
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
|
||||
|
||||
# --- Test 2: window outside the trace short-circuits to empty + warning ---
|
||||
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
|
||||
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
|
||||
past_count, past_warnings = _count(past_start_ms, past_end_ms, target_trace_id)
|
||||
assert past_count == 0, f"Expected count=0 for trace_id aggregation outside time window, got {past_count}"
|
||||
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
|
||||
|
||||
# --- Test 3: trace_id not present in trace_summary still returns logs (no warning) ---
|
||||
orphan_count, orphan_warnings = _count(narrow_start_ms, now_ms, orphan_trace_id)
|
||||
assert orphan_count == 1, f"Expected count=1 for orphan trace_id aggregation, got {orphan_count} — query may have been incorrectly short-circuited"
|
||||
assert not any(outside_range_msg in m for m in orphan_warnings), f"Did not expect outside-range warning for orphan trace_id, got {orphan_warnings}"
|
||||
|
||||
@@ -2062,7 +2062,7 @@ def test_traces_list_filter_by_trace_id(
|
||||
|
||||
trace_filter = f"trace_id = '{target_trace_id}'"
|
||||
|
||||
def _query(start_ms: int, end_ms: int) -> list:
|
||||
def _query(start_ms: int, end_ms: int) -> tuple[list, list[str]]:
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
@@ -2096,30 +2096,157 @@ def test_traces_list_filter_by_trace_id(
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
return response.json()["data"]["data"]["results"][0]["rows"] or []
|
||||
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
|
||||
warning = (response.json().get("data") or {}).get("warning") or {}
|
||||
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
|
||||
return rows, messages
|
||||
|
||||
outside_range_msg = "lies outside the selected time range"
|
||||
|
||||
now_ms = int(now.timestamp() * 1000)
|
||||
|
||||
# --- Test 1: narrow window (single bucket, <1 h) ---
|
||||
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
narrow_rows = _query(narrow_start_ms, now_ms)
|
||||
narrow_rows, narrow_warnings = _query(narrow_start_ms, now_ms)
|
||||
|
||||
assert len(narrow_rows) == 1, f"Expected 1 span for trace_id filter (narrow window), got {len(narrow_rows)}"
|
||||
assert narrow_rows[0]["data"]["span_id"] == span_id_root
|
||||
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
|
||||
assert not any(outside_range_msg in m for m in narrow_warnings), f"Did not expect outside-range warning, got {narrow_warnings}"
|
||||
|
||||
# --- Test 2: wide window (>1 h, triggers multiple exponential buckets) ---
|
||||
# should just return 1 span, not duplicate
|
||||
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
|
||||
wide_rows = _query(wide_start_ms, now_ms)
|
||||
wide_rows, wide_warnings = _query(wide_start_ms, now_ms)
|
||||
|
||||
assert len(wide_rows) == 1, f"Expected 1 span for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-span regression"
|
||||
assert wide_rows[0]["data"]["span_id"] == span_id_root
|
||||
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
|
||||
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
|
||||
|
||||
# --- Test 3: window that does not contain the trace returns no results ---
|
||||
# --- Test 3: window that does not contain the trace returns no results + warning ---
|
||||
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
|
||||
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
|
||||
past_rows = _query(past_start_ms, past_end_ms)
|
||||
past_rows, past_warnings = _query(past_start_ms, past_end_ms)
|
||||
|
||||
assert len(past_rows) == 0, f"Expected 0 spans for trace_id filter outside time window, got {len(past_rows)}"
|
||||
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
|
||||
|
||||
|
||||
def test_traces_aggregation_filter_by_trace_id(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Tests that the trace_id time-range optimization also applies to
|
||||
non-window-list (time_series / aggregation) traces queries:
|
||||
1. Wide query window containing the trace returns the correct count.
|
||||
2. Query window outside the trace's time range short-circuits to empty.
|
||||
3. Filter referencing a trace_id with no row in trace_summary
|
||||
short-circuits to empty (trace_summary is authoritative for traces).
|
||||
"""
|
||||
target_trace_id = TraceIdGenerator.trace_id()
|
||||
target_root_span_id = TraceIdGenerator.span_id()
|
||||
target_child_span_id = TraceIdGenerator.span_id()
|
||||
missing_trace_id = TraceIdGenerator.trace_id()
|
||||
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
common_resources = {
|
||||
"deployment.environment": "production",
|
||||
"service.name": "traces-agg-filter-service",
|
||||
"cloud.provider": "integration",
|
||||
}
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_root_span_id,
|
||||
parent_span_id="",
|
||||
name="root-span",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources=common_resources,
|
||||
attributes={"http.request.method": "GET"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_child_span_id,
|
||||
parent_span_id=target_root_span_id,
|
||||
name="child-span",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources=common_resources,
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
def _count(start_ms: int, end_ms: int, trace_id: str) -> tuple[float, list[str]]:
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=start_ms,
|
||||
end_ms=end_ms,
|
||||
request_type="time_series",
|
||||
queries=[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"stepInterval": 60,
|
||||
"disabled": False,
|
||||
"filter": {"expression": f"trace_id = '{trace_id}'"},
|
||||
"aggregations": [{"expression": "count()"}],
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
results = response.json()["data"]["data"]["results"]
|
||||
assert len(results) == 1
|
||||
warning = (response.json().get("data") or {}).get("warning") or {}
|
||||
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
|
||||
aggregations = results[0].get("aggregations") or []
|
||||
if not aggregations:
|
||||
return 0, messages
|
||||
series = aggregations[0].get("series") or []
|
||||
if not series:
|
||||
return 0, messages
|
||||
return sum(v["value"] for v in series[0]["values"]), messages
|
||||
|
||||
outside_range_msg = "lies outside the selected time range"
|
||||
|
||||
now_ms = int(now.timestamp() * 1000)
|
||||
|
||||
# --- Test 1: wide window (>1 h) containing the trace returns both spans ---
|
||||
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
|
||||
wide_count, wide_warnings = _count(wide_start_ms, now_ms, target_trace_id)
|
||||
assert wide_count == 2, f"Expected count=2 for trace_id aggregation (wide window), got {wide_count}"
|
||||
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
|
||||
|
||||
# --- Test 2: window outside the trace short-circuits to empty + warning ---
|
||||
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
|
||||
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
|
||||
past_count, past_warnings = _count(past_start_ms, past_end_ms, target_trace_id)
|
||||
assert past_count == 0, f"Expected count=0 for trace_id aggregation outside time window, got {past_count}"
|
||||
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
|
||||
|
||||
# --- Test 3: trace_id with no entry in trace_summary short-circuits (no warning) ---
|
||||
missing_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
missing_count, missing_warnings = _count(missing_start_ms, now_ms, missing_trace_id)
|
||||
assert missing_count == 0, f"Expected count=0 for trace_id absent from trace_summary, got {missing_count}"
|
||||
assert not any(outside_range_msg in m for m in missing_warnings), f"Did not expect outside-range warning for missing trace_id, got {missing_warnings}"
|
||||
|
||||
436
tests/integration/tests/querier/15_trace_operator.py
Normal file
436
tests/integration/tests/querier/15_trace_operator.py
Normal file
@@ -0,0 +1,436 @@
|
||||
"""
|
||||
Integration tests for TraceOperatorQuery (builder_trace_operator) through the
|
||||
/api/v5/query_range endpoint.
|
||||
|
||||
Covers:
|
||||
1. Order-by variants (A -> B, A => B) with returnSpansFrom="A".
|
||||
Guards against the NOT_FOUND_COLUMN_IN_BLOCK regression where ordering by a
|
||||
column absent from an outer SELECT caused a query failure.
|
||||
2. Expression operators (=>, ->, &&, ||, A NOT B) with and without returnSpansFrom.
|
||||
|
||||
returnSpansFrom semantics
|
||||
--------------------------
|
||||
returnSpansFrom="" (default)
|
||||
The final rows come from the expression's root CTE. Only spans that
|
||||
directly satisfy the structural predicate are returned.
|
||||
|
||||
returnSpansFrom="A"
|
||||
The expression is still evaluated in full (the structural relationship
|
||||
must hold), but the final rows are drawn from the A sub-query CTE,
|
||||
filtered to traces that appeared in the expression result. Concretely:
|
||||
the query returns every A span whose trace_id belongs to a trace that
|
||||
matched the expression.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from http import HTTPStatus
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.querier import get_rows
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
|
||||
|
||||
def _names(response: requests.Response) -> set:
|
||||
return {r["data"]["name"] for r in get_rows(response)}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Dataset — 4 traces using real OTel semantic-convention attributes
|
||||
#
|
||||
# Filter A = "http.method EXISTS" (HTTP entry-point spans)
|
||||
# Filter B = "db.system = 'redis'" (direct Redis cache calls)
|
||||
# / "db.system = 'postgresql'" (deeper DB queries, for indirect tests)
|
||||
# / "messaging.system = 'kafka'" (async consumer, for OR tests)
|
||||
#
|
||||
# T1 checkout-svc [SERVER] POST /checkout (5s) ← structural root, http.method=POST
|
||||
# ├─ proxy-svc [SERVER] api-proxy (3s) ← http.method=POST; no db children
|
||||
# └─ [CLIENT] lookup-cart (redis)
|
||||
# └─ [CLIENT] check-inventory (postgresql)
|
||||
#
|
||||
# T2 catalog-svc [SERVER] GET /catalog (1s) ← http.method=GET
|
||||
# └─ [CLIENT] fetch-catalog (redis)
|
||||
# └─ [CLIENT] read-cache (postgresql)
|
||||
#
|
||||
# T3 standalone-svc [SERVER] standalone-server ← http.method=POST; no db/cache children
|
||||
# T4 isolated-svc [CONSUMER] isolated-worker ← messaging.system=kafka; no http.method → not in A
|
||||
#
|
||||
# T1 has TWO spans matching filter A (http.method EXISTS); returnSpansFrom changes what is returned:
|
||||
# default → only spans that directly satisfy the structural predicate
|
||||
# return_A → all matching A spans from traces where the predicate held
|
||||
#
|
||||
# Expression truth table:
|
||||
# A -> B (indirect) A=http.method EXISTS B=db.system='postgresql' T1✓ T2✓ T3✗ T4✗
|
||||
# A => B (direct) A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
|
||||
# A && B A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
|
||||
# A || B A=http.method EXISTS B=messaging.system='kafka' T1✓ T2✓ T3✓ T4✓
|
||||
# A NOT B A=http.method EXISTS B=db.system='redis' T1✗ T2✗ T3✓ T4✗
|
||||
#
|
||||
# Order-by cases (all use returnSpansFrom=A, 3 rows expected):
|
||||
# ob.indirect A->B order http.method DESC → POST(checkout+proxy), GET(catalog)
|
||||
# ob.duration A=>B order duration_nano DESC → POST/checkout(5s), api-proxy(3s), GET/catalog(1s)
|
||||
# ob.select A=>B order http.method DESC → POST, POST, GET
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"case",
|
||||
[
|
||||
# ── Order-by: http.method DESC, NOT in selectFields ──────────────────────
|
||||
# Guards against NOT_FOUND_COLUMN_IN_BLOCK: ordering by a column absent from
|
||||
# the outer SELECT used to cause a ClickHouse query failure.
|
||||
# returnSpansFrom="A" returns all T1 http.method spans: POST /checkout and api-proxy
|
||||
# (both http.method="POST", services checkout-svc and proxy-svc), plus
|
||||
# GET /catalog (http.method="GET") from T2.
|
||||
# The two POST spans are tied so their relative order is undefined; catalog-svc
|
||||
# (GET) is guaranteed to sort last.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "A",
|
||||
"select_fields": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
|
||||
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and {get_rows(r)[0]["data"]["service.name"], get_rows(r)[1]["data"]["service.name"]} == {"checkout-svc", "proxy-svc"} and get_rows(r)[2]["data"]["service.name"] == "catalog-svc",
|
||||
},
|
||||
id="ob.indirect.http_method_not_in_select",
|
||||
),
|
||||
# ── Order-by: duration_nano DESC, core span field ─────────────────────────
|
||||
# returnSpansFrom="A" includes api-proxy (3 s) in T1's result.
|
||||
# Order: POST /checkout (5 s) > api-proxy (3 s) > GET /catalog (1 s).
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"order": [{"key": {"name": "duration_nano", "fieldContext": "span"}, "direction": "desc"}],
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and get_rows(r)[0]["data"]["name"] == "POST /checkout" and get_rows(r)[1]["data"]["name"] == "api-proxy" and get_rows(r)[2]["data"]["name"] == "GET /catalog",
|
||||
},
|
||||
id="ob.duration.duration_nano_desc",
|
||||
),
|
||||
# ── Order-by: http.method DESC, IS in selectFields ────────────────────────
|
||||
# http.method is selected so it appears in each result row.
|
||||
# Both POST /checkout and api-proxy carry http.method="POST"; their relative
|
||||
# order is undefined. GET /catalog ("GET") always sorts last.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"select_fields": [{"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}],
|
||||
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and get_rows(r)[0]["data"]["http.method"] == "POST" and get_rows(r)[1]["data"]["http.method"] == "POST" and get_rows(r)[2]["data"]["http.method"] == "GET",
|
||||
},
|
||||
id="ob.select.http_method_in_select",
|
||||
),
|
||||
# ── A => B (direct child), returnSpansFrom="" ─────────────────────────────
|
||||
# POST /checkout directly parents lookup-cart (redis); api-proxy has no redis child.
|
||||
# T3 does not match (no redis descendant). Default returns only the satisfying A spans.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 2 and _names(r) == {"POST /checkout", "GET /catalog"},
|
||||
},
|
||||
id="ex.direct_child.default",
|
||||
),
|
||||
# ── A => B (direct child), returnSpansFrom="A" ────────────────────────────
|
||||
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.direct_child.return_A",
|
||||
),
|
||||
# ── A -> B (indirect descendant), returnSpansFrom="" ──────────────────────
|
||||
# POST /checkout is an ancestor of check-inventory (postgresql) via lookup-cart.
|
||||
# api-proxy has no postgresql descendants. T3 has no postgresql descendant.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 2 and _names(r) == {"POST /checkout", "GET /catalog"},
|
||||
},
|
||||
id="ex.indirect_descendant.default",
|
||||
),
|
||||
# ── A -> B (indirect descendant), returnSpansFrom="A" ────────────────────
|
||||
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.indirect_descendant.return_A",
|
||||
),
|
||||
# ── A && B (both present in same trace), returnSpansFrom="" ───────────────
|
||||
# T1 and T2 match (each trace has http.method spans AND redis spans); T3 does not.
|
||||
# A && B returns all A spans from matching traces — api-proxy is included
|
||||
# because it shares T1's trace_id with POST /checkout.
|
||||
# (return_A produces the same set by definition; no separate case needed.)
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A && B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.and.default",
|
||||
),
|
||||
# ── A || B (either present), returnSpansFrom="" ───────────────────────────
|
||||
# T1, T2, T3 match via A (http.method spans); T4 matches via B (kafka span).
|
||||
# Default returns UNION of all A and B spans from matching traces.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "messaging.system = 'kafka'",
|
||||
"expression": "A || B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 5 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server", "isolated-worker"},
|
||||
},
|
||||
id="ex.or.default",
|
||||
),
|
||||
# ── A || B, returnSpansFrom="A" ───────────────────────────────────────────
|
||||
# All four traces match; only A spans are returned.
|
||||
# T4 has no http.method span, so it contributes nothing to A.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "messaging.system = 'kafka'",
|
||||
"expression": "A || B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: len(get_rows(r)) == 4 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server"},
|
||||
},
|
||||
id="ex.or.return_A",
|
||||
),
|
||||
# ── A NOT B (A present, B absent from trace), returnSpansFrom="" ─────────
|
||||
# T1 and T2 do NOT match: their traces contain redis spans.
|
||||
# T3 MATCHES: has http.method span but no redis span in its trace.
|
||||
# T4 has no http.method span, so it cannot contribute an A span.
|
||||
# (return_A produces the same set; no separate case needed.)
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A NOT B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 1 and _names(r) == {"standalone-server"},
|
||||
},
|
||||
id="ex.not.default",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_trace_operator(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
case: dict,
|
||||
) -> None:
|
||||
t1_trace_id = TraceIdGenerator.trace_id()
|
||||
t1_checkout_span_id = TraceIdGenerator.span_id() # POST /checkout — structural root of T1
|
||||
t1_child_span_id = TraceIdGenerator.span_id() # lookup-cart
|
||||
|
||||
t2_trace_id = TraceIdGenerator.trace_id()
|
||||
t2_root_span_id = TraceIdGenerator.span_id()
|
||||
t2_child_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
# T1 — two http.method spans in the same trace, modelling a real proxy+service pair.
|
||||
# POST /checkout (checkout-svc) is the root (parent_span_id="").
|
||||
# api-proxy (proxy-svc) is a structural child of POST /checkout but also has
|
||||
# http.method set, so it matches filter A alongside POST /checkout.
|
||||
# Both carry http.method="POST" — they differ only in service.name.
|
||||
# This is what makes returnSpansFrom="" and returnSpansFrom="A" distinct:
|
||||
# default → only POST /checkout satisfies A => B or A -> B
|
||||
# return_A → api-proxy is pulled in too (all A spans from the matching trace)
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=t1_checkout_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /checkout",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/checkout"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t1_checkout_span_id,
|
||||
name="api-proxy",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "proxy-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/proxy"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=t1_child_span_id,
|
||||
parent_span_id=t1_checkout_span_id,
|
||||
name="lookup-cart",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"db.system": "redis", "db.operation": "GET"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t1_child_span_id,
|
||||
name="check-inventory",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
|
||||
),
|
||||
# T2 — catalog-svc: GET /catalog (1 s root) → fetch-catalog (redis) → read-cache (postgresql)
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=t2_root_span_id,
|
||||
parent_span_id="",
|
||||
name="GET /catalog",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"http.method": "GET", "http.route": "/catalog"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=t2_child_span_id,
|
||||
parent_span_id=t2_root_span_id,
|
||||
name="fetch-catalog",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"db.system": "redis", "db.operation": "GET"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t2_child_span_id,
|
||||
name="read-cache",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
|
||||
),
|
||||
# T3 — standalone-svc: HTTP entry span with no downstream calls.
|
||||
# Fails A => B / A -> B / A && B (no redis/postgresql descendant).
|
||||
# Matches A NOT B (has http.method span, no redis child).
|
||||
# Contributes to A || B via A.
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id="",
|
||||
name="standalone-server",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "standalone-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/"},
|
||||
),
|
||||
# T4 — isolated-svc: Kafka consumer; no http.method so it never matches filter A.
|
||||
# Used only as the B side of A || B to prove the OR operator matches via B.
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id="",
|
||||
name="isolated-worker",
|
||||
kind=TracesKind.SPAN_KIND_CONSUMER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "isolated-svc"},
|
||||
attributes={"messaging.system": "kafka", "messaging.destination": "orders"},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
|
||||
spec: dict = {
|
||||
"name": "C",
|
||||
"expression": case["expression"],
|
||||
"returnSpansFrom": case.get("return_spans_from", ""),
|
||||
"limit": case.get("limit", 100),
|
||||
}
|
||||
if case.get("select_fields"):
|
||||
spec["selectFields"] = case["select_fields"]
|
||||
if case.get("order"):
|
||||
spec["order"] = case["order"]
|
||||
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
|
||||
timeout=5,
|
||||
headers={"authorization": f"Bearer {token}"},
|
||||
json={
|
||||
"schemaVersion": "v1",
|
||||
"start": start_ms,
|
||||
"end": end_ms,
|
||||
"requestType": "raw",
|
||||
"compositeQuery": {
|
||||
"queries": [
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {"name": "A", "signal": "traces", "filter": {"expression": case["filter_a"]}, "limit": 100},
|
||||
},
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {"name": "B", "signal": "traces", "filter": {"expression": case["filter_b"]}, "limit": 100},
|
||||
},
|
||||
{"type": "builder_trace_operator", "spec": spec},
|
||||
]
|
||||
},
|
||||
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
|
||||
},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK, f"HTTP {response.status_code}: {response.text}"
|
||||
assert case["validate"](response), f"validation failed: {response.json()}"
|
||||
Reference in New Issue
Block a user