mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-19 00:10:32 +01:00
Compare commits
28 Commits
ns/fg-sele
...
issue_4203
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
13dec174bf | ||
|
|
9ee57c0950 | ||
|
|
33df48c822 | ||
|
|
af117374c8 | ||
|
|
ba4cef67ac | ||
|
|
f0c33a6734 | ||
|
|
e897f4866a | ||
|
|
282b6fdef1 | ||
|
|
9b64bb2fc0 | ||
|
|
b818ff5fc4 | ||
|
|
e7d729ab5d | ||
|
|
ed812ad1c8 | ||
|
|
3b82c2ce43 | ||
|
|
214980ddad | ||
|
|
a7b69a2678 | ||
|
|
73c82f50a9 | ||
|
|
2593c5eb91 | ||
|
|
b6b2d36baa | ||
|
|
a444a039f9 | ||
|
|
bfb050ec17 | ||
|
|
ff3e87f70c | ||
|
|
9ac02ebe00 | ||
|
|
fbdd0bebbc | ||
|
|
b2245b48fe | ||
|
|
87e654fc73 | ||
|
|
0ee31ce440 | ||
|
|
63e681b87b | ||
|
|
28375c8c1e |
@@ -265,6 +265,15 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: This should move to readAsRaw function in consume.go but for now we are keeping it here since it's only relevant for traces
|
||||
if q.spec.Signal == telemetrytypes.SignalTraces {
|
||||
if raw, ok := payload.(*qbtypes.RawData); ok {
|
||||
for _, rr := range raw.Rows {
|
||||
mergeSpanAttributeColumns(rr.Data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &qbtypes.Result{
|
||||
Type: q.kind,
|
||||
Value: payload,
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
@@ -431,6 +432,53 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// mergeSpanAttributeColumns merges (attributes_string, attributes_number, attributes_bool, resources_string) into
|
||||
// unified "attributes" and "resource" keys, and parses the stringified `events`
|
||||
// and `links` columns into structured slices. Raw DB columns are removed.
|
||||
func mergeSpanAttributeColumns(data map[string]any) {
|
||||
attrStr, hasStr := data["attributes_string"]
|
||||
attrNum, hasNum := data["attributes_number"]
|
||||
attrBool, hasBool := data["attributes_bool"]
|
||||
// todo(nitya): move to resource json
|
||||
resStr, hasRes := data["resources_string"]
|
||||
if hasStr || hasNum || hasBool || hasRes {
|
||||
attributes := make(map[string]any)
|
||||
if m, ok := attrStr.(map[string]string); ok {
|
||||
for k, v := range m {
|
||||
attributes[k] = v
|
||||
}
|
||||
}
|
||||
if m, ok := attrNum.(map[string]float64); ok {
|
||||
for k, v := range m {
|
||||
attributes[k] = v
|
||||
}
|
||||
}
|
||||
if m, ok := attrBool.(map[string]bool); ok {
|
||||
for k, v := range m {
|
||||
attributes[k] = v
|
||||
}
|
||||
}
|
||||
delete(data, "attributes_string")
|
||||
delete(data, "attributes_number")
|
||||
delete(data, "attributes_bool")
|
||||
data["attributes"] = attributes
|
||||
|
||||
resource := map[string]string{}
|
||||
if m, ok := resStr.(map[string]string); ok {
|
||||
resource = m
|
||||
}
|
||||
data["resource"] = resource
|
||||
delete(data, "resources_string")
|
||||
}
|
||||
|
||||
if raw, ok := data["events"]; ok {
|
||||
data["events"] = spantypes.ParseEvents(raw)
|
||||
}
|
||||
if raw, ok := data["links"]; ok {
|
||||
data["links"] = spantypes.ParseLinks(raw)
|
||||
}
|
||||
}
|
||||
|
||||
// numericAsFloat converts numeric types to float64 efficiently.
|
||||
func numericAsFloat(v any) float64 {
|
||||
switch x := v.(type) {
|
||||
|
||||
91
pkg/querier/consume_test.go
Normal file
91
pkg/querier/consume_test.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package querier
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
)
|
||||
|
||||
func TestMergeSpanAttributeColumns_ParsesEventsAndLinks(t *testing.T) {
|
||||
data := map[string]any{
|
||||
"attributes_string": map[string]string{"http.method": "GET"},
|
||||
"attributes_number": map[string]float64{"http.status_code": 200},
|
||||
"attributes_bool": map[string]bool{"is_root": true},
|
||||
"resources_string": map[string]string{"service.name": "api"},
|
||||
"events": []string{
|
||||
`{"name":"request_received","timeUnixNano":1778489782759245000,"attributeMap":{"http.method":"GET","http.route":"/api/chat"}}`,
|
||||
`{"name":"cache_lookup","timeUnixNano":1778489782811697000,"attributeMap":{"cache.hit":"true","cache.key":"user:123:prompt"}}`,
|
||||
},
|
||||
"links": `[{"traceId":"abc","spanId":"123","refType":"CHILD_OF"},{"traceId":"def","spanId":"456","refType":"FOLLOWS_FROM"}]`,
|
||||
}
|
||||
|
||||
mergeSpanAttributeColumns(data)
|
||||
|
||||
attrs, ok := data["attributes"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("expected attributes to be map[string]any, got %T", data["attributes"])
|
||||
}
|
||||
if attrs["http.method"] != "GET" || attrs["http.status_code"] != float64(200) || attrs["is_root"] != true {
|
||||
t.Fatalf("attributes not merged correctly: %#v", attrs)
|
||||
}
|
||||
|
||||
res, ok := data["resource"].(map[string]string)
|
||||
if !ok || res["service.name"] != "api" {
|
||||
t.Fatalf("resource not set correctly: %#v", data["resource"])
|
||||
}
|
||||
|
||||
for _, removed := range []string{"attributes_string", "attributes_number", "attributes_bool", "resources_string"} {
|
||||
if _, present := data[removed]; present {
|
||||
t.Fatalf("expected %s to be removed", removed)
|
||||
}
|
||||
}
|
||||
|
||||
events, ok := data["events"].([]spantypes.Event)
|
||||
if !ok {
|
||||
t.Fatalf("expected events to be []spantypes.Event, got %T", data["events"])
|
||||
}
|
||||
wantEvents := []spantypes.Event{
|
||||
{
|
||||
Name: "request_received",
|
||||
TimeUnixNano: 1778489782759245000,
|
||||
Attributes: map[string]any{"http.method": "GET", "http.route": "/api/chat"},
|
||||
},
|
||||
{
|
||||
Name: "cache_lookup",
|
||||
TimeUnixNano: 1778489782811697000,
|
||||
Attributes: map[string]any{"cache.hit": "true", "cache.key": "user:123:prompt"},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(events, wantEvents) {
|
||||
t.Fatalf("events parsed incorrectly:\n got: %#v\nwant: %#v", events, wantEvents)
|
||||
}
|
||||
|
||||
links, ok := data["links"].([]spantypes.Link)
|
||||
if !ok {
|
||||
t.Fatalf("expected links to be []spantypes.Link, got %T", data["links"])
|
||||
}
|
||||
wantLinks := []spantypes.Link{
|
||||
{TraceID: "abc", SpanID: "123"},
|
||||
{TraceID: "def", SpanID: "456"},
|
||||
}
|
||||
if !reflect.DeepEqual(links, wantLinks) {
|
||||
t.Fatalf("links parsed incorrectly:\n got: %#v\nwant: %#v", links, wantLinks)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeSpanAttributeColumns_EmptyEventsAndLinks(t *testing.T) {
|
||||
data := map[string]any{
|
||||
"events": []string{},
|
||||
"links": "[]",
|
||||
}
|
||||
|
||||
mergeSpanAttributeColumns(data)
|
||||
|
||||
if events, ok := data["events"].([]spantypes.Event); !ok || len(events) != 0 {
|
||||
t.Fatalf("expected empty []spantypes.Event, got %#v", data["events"])
|
||||
}
|
||||
if links, ok := data["links"].([]spantypes.Link); !ok || len(links) != 0 {
|
||||
t.Fatalf("expected empty []spantypes.Link, got %#v", data["links"])
|
||||
}
|
||||
}
|
||||
@@ -85,6 +85,13 @@ func (q *traceOperatorQuery) executeWithContext(ctx context.Context, query strin
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: This should move to readAsRaw function in consume.go but for now we can keep it here since it's only relevant for traces
|
||||
if raw, ok := payload.(*qbtypes.RawData); ok {
|
||||
for _, rr := range raw.Rows {
|
||||
mergeSpanAttributeColumns(rr.Data)
|
||||
}
|
||||
}
|
||||
|
||||
return &qbtypes.Result{
|
||||
Type: q.kind,
|
||||
Value: payload,
|
||||
|
||||
@@ -1156,11 +1156,9 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
r.logger.Info("cache miss for getFlamegraphSpansForTrace", "traceID", traceID)
|
||||
|
||||
selectCols := "timestamp, duration_nano, span_id, trace_id, has_error, links as references, resource_string_service$$name, name, events"
|
||||
selectFieldCols := req.GetSelectedFieldsSourceColumns()
|
||||
if len(selectFieldCols) > 0 {
|
||||
selectCols = fmt.Sprintf("%s, %s", selectCols, strings.Join(selectFieldCols, ", "))
|
||||
if len(req.SelectFields) > 0 {
|
||||
selectCols += ", attributes_string, attributes_number, attributes_bool, resources_string"
|
||||
}
|
||||
|
||||
flamegraphQuery := fmt.Sprintf("SELECT %s FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", selectCols, r.TraceDB, r.traceTableName)
|
||||
|
||||
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, flamegraphQuery)
|
||||
|
||||
@@ -346,45 +346,6 @@ type GetFlamegraphSpansForTraceParams struct {
|
||||
SelectFields []telemetrytypes.TelemetryFieldKey `json:"selectFields"`
|
||||
}
|
||||
|
||||
func (r *GetFlamegraphSpansForTraceParams) GetSelectedFieldsSourceColumns() []string {
|
||||
var needsAttrString, needsAttrNumber, needsAttrBool, needsResourceMap bool
|
||||
for _, f := range r.SelectFields {
|
||||
switch f.FieldContext {
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
switch f.FieldDataType {
|
||||
case telemetrytypes.FieldDataTypeString:
|
||||
needsAttrString = true
|
||||
case telemetrytypes.FieldDataTypeFloat64, telemetrytypes.FieldDataTypeNumber, telemetrytypes.FieldDataTypeInt64:
|
||||
needsAttrNumber = true
|
||||
case telemetrytypes.FieldDataTypeBool:
|
||||
needsAttrBool = true
|
||||
default:
|
||||
// Unknown type: AttributeValue searches all three maps, so we need all.
|
||||
needsAttrString = true
|
||||
needsAttrNumber = true
|
||||
needsAttrBool = true
|
||||
}
|
||||
case telemetrytypes.FieldContextResource:
|
||||
needsResourceMap = true
|
||||
}
|
||||
}
|
||||
|
||||
var cols []string
|
||||
if needsAttrString {
|
||||
cols = append(cols, "attributes_string")
|
||||
}
|
||||
if needsAttrNumber {
|
||||
cols = append(cols, "attributes_number")
|
||||
}
|
||||
if needsAttrBool {
|
||||
cols = append(cols, "attributes_bool")
|
||||
}
|
||||
if needsResourceMap {
|
||||
cols = append(cols, "resources_string")
|
||||
}
|
||||
return cols
|
||||
}
|
||||
|
||||
type SpanFilterParams struct {
|
||||
TraceID []string `json:"traceID"`
|
||||
Status []string `json:"status"`
|
||||
|
||||
@@ -1,6 +1,50 @@
|
||||
package telemetrytraces
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
// Internal Columns.
|
||||
SpanTimestampBucketStartColumn = "ts_bucket_start"
|
||||
SpanResourceFingerPrintColumn = "resource_fingerprint"
|
||||
|
||||
// Intrinsic Columns.
|
||||
SpanTimestampColumn = "timestamp"
|
||||
SpanTraceIDColumn = "trace_id"
|
||||
SpanSpanIDColumn = "span_id"
|
||||
SpanTraceStateColumn = "trace_state"
|
||||
SpanParentSpanIDColumn = "parent_span_id"
|
||||
SpanFlagsColumn = "flags"
|
||||
SpanNameColumn = "name"
|
||||
SpanKindColumn = "kind"
|
||||
SpanKindStringColumn = "kind_string"
|
||||
SpanDurationNanoColumn = "duration_nano"
|
||||
SpanStatusCodeColumn = "status_code"
|
||||
SpanStatusMessageColumn = "status_message"
|
||||
SpanStatusCodeStringColumn = "status_code_string"
|
||||
SpanEventsColumn = "events"
|
||||
SpanLinksColumn = "links"
|
||||
|
||||
// Calculated Columns.
|
||||
SpanResponseStatusCodeColumn = "response_status_code"
|
||||
SpanExternalHTTPURLColumn = "external_http_url"
|
||||
SpanHTTPURLColumn = "http_url"
|
||||
SpanExternalHTTPMethodColumn = "external_http_method"
|
||||
SpanHTTPMethodColumn = "http_method"
|
||||
SpanHTTPHostColumn = "http_host"
|
||||
SpanDBNameColumn = "db_name"
|
||||
SpanDBOperationColumn = "db_operation"
|
||||
SpanHasErrorColumn = "has_error"
|
||||
SpanIsRemoteColumn = "is_remote"
|
||||
|
||||
// Contextual Columns.
|
||||
SpanAttributesStringColumn = "attributes_string"
|
||||
SpanAttributesNumberColumn = "attributes_number"
|
||||
SpanAttributesBoolColumn = "attributes_bool"
|
||||
SpanResourcesStringColumn = "resources_string"
|
||||
)
|
||||
|
||||
var (
|
||||
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
|
||||
@@ -334,6 +378,51 @@ var (
|
||||
SpanSearchScopeRoot = "isroot"
|
||||
SpanSearchScopeEntryPoint = "isentrypoint"
|
||||
|
||||
// IntrinsicSpanFields lists the intrinsic span columns, in the order they
|
||||
// should appear when a raw query expands its SelectFields.
|
||||
IntrinsicSpanFields = []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: SpanTimestampColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanTraceIDColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanSpanIDColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanTraceStateColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanParentSpanIDColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanFlagsColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanNameColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanKindColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanKindStringColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanDurationNanoColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanStatusCodeColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanStatusMessageColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanStatusCodeStringColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanEventsColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanLinksColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
}
|
||||
|
||||
// CalculatedSpanFields lists the calculated/derived span columns, in the
|
||||
// order they should appear when a raw query expands its SelectFields.
|
||||
CalculatedSpanFields = []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: SpanResponseStatusCodeColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanExternalHTTPURLColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanHTTPURLColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanExternalHTTPMethodColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanHTTPMethodColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanHTTPHostColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanDBNameColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanDBOperationColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanHasErrorColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanIsRemoteColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
}
|
||||
|
||||
// ContextualSpanColumns lists the typed attribute and resource columns
|
||||
// selected raw (rather than via ColumnExpressionFor) so that consume.go
|
||||
// can merge them into unified "attributes" and "resource" maps.
|
||||
ContextualSpanColumns = []string{
|
||||
SpanAttributesStringColumn,
|
||||
SpanAttributesNumberColumn,
|
||||
SpanAttributesBoolColumn,
|
||||
SpanResourcesStringColumn,
|
||||
}
|
||||
|
||||
DefaultFields = map[string]telemetrytypes.TelemetryFieldKey{
|
||||
"timestamp": {
|
||||
Name: "timestamp",
|
||||
|
||||
@@ -78,6 +78,17 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
// Query like `attribute.attribute_string:string` should resolve to `attributes_string['attribute_string']`.
|
||||
name: "Attribute key whose name collides with contextual map column resolves as a map lookup",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: SpanAttributesStringColumn,
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
expectedResult: "attributes_string['attributes_string']",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Non-existent column",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
@@ -16,7 +15,6 @@ import (
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -89,40 +87,13 @@ func (b *traceQueryStatementBuilder) Build(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
/*
|
||||
Adding a tech debt note here:
|
||||
This piece of code is a hot fix and should be removed once we close issue: engineering-pod/issues/3622
|
||||
*/
|
||||
/*
|
||||
-------------------------------- Start of tech debt ----------------------------
|
||||
*/
|
||||
isSelectFieldsEmpty := false
|
||||
if requestType == qbtypes.RequestTypeRaw {
|
||||
|
||||
selectedFields := query.SelectFields
|
||||
|
||||
if len(selectedFields) == 0 {
|
||||
sortedKeys := maps.Keys(DefaultFields)
|
||||
slices.Sort(sortedKeys)
|
||||
for _, key := range sortedKeys {
|
||||
selectedFields = append(selectedFields, DefaultFields[key])
|
||||
}
|
||||
query.SelectFields = selectedFields
|
||||
}
|
||||
|
||||
selectFieldKeys := []string{}
|
||||
for _, field := range selectedFields {
|
||||
selectFieldKeys = append(selectFieldKeys, field.Name)
|
||||
}
|
||||
|
||||
for _, x := range []string{"timestamp", "span_id", "trace_id"} {
|
||||
if !slices.Contains(selectFieldKeys, x) {
|
||||
query.SelectFields = append(query.SelectFields, DefaultFields[x])
|
||||
}
|
||||
}
|
||||
isSelectFieldsEmpty = len(query.SelectFields) == 0
|
||||
// we are expanding here to ensure that all the conflicts are taken care in adjustKeys
|
||||
// i.e if there is a conflict we strip away context of the key in adjustKeys
|
||||
query = b.expandRawSelectFields(query)
|
||||
}
|
||||
/*
|
||||
-------------------------------- End of tech debt ----------------------------
|
||||
*/
|
||||
|
||||
query = b.adjustKeys(ctx, keys, query, requestType)
|
||||
|
||||
@@ -131,7 +102,7 @@ func (b *traceQueryStatementBuilder) Build(
|
||||
|
||||
switch requestType {
|
||||
case qbtypes.RequestTypeRaw:
|
||||
return b.buildListQuery(ctx, q, query, start, end, keys, variables)
|
||||
return b.buildListQuery(ctx, q, query, start, end, keys, variables, isSelectFieldsEmpty)
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
|
||||
case qbtypes.RequestTypeScalar:
|
||||
@@ -295,6 +266,7 @@ func (b *traceQueryStatementBuilder) buildListQuery(
|
||||
start, end uint64,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
isSelectFieldsEmpty bool,
|
||||
) (*qbtypes.Statement, error) {
|
||||
|
||||
var (
|
||||
@@ -309,7 +281,6 @@ func (b *traceQueryStatementBuilder) buildListQuery(
|
||||
cteArgs = append(cteArgs, args)
|
||||
}
|
||||
|
||||
// TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs?
|
||||
for _, field := range query.SelectFields {
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &field, keys)
|
||||
if err != nil {
|
||||
@@ -318,6 +289,12 @@ func (b *traceQueryStatementBuilder) buildListQuery(
|
||||
sb.SelectMore(colExpr)
|
||||
}
|
||||
|
||||
if isSelectFieldsEmpty {
|
||||
for _, col := range ContextualSpanColumns {
|
||||
sb.SelectMore(col)
|
||||
}
|
||||
}
|
||||
|
||||
// From table
|
||||
sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName))
|
||||
|
||||
@@ -844,3 +821,30 @@ func (b *traceQueryStatementBuilder) buildResourceFilterCTE(
|
||||
variables,
|
||||
)
|
||||
}
|
||||
|
||||
// expandRawSelectFields populates SelectFields for raw (list view) queries.
|
||||
// It must be called before adjustKeys so that normalization runs over the full set.
|
||||
func (b *traceQueryStatementBuilder) expandRawSelectFields(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] {
|
||||
if len(query.SelectFields) == 0 {
|
||||
selectFields := make([]telemetrytypes.TelemetryFieldKey, 0, len(IntrinsicSpanFields)+len(CalculatedSpanFields))
|
||||
selectFields = append(selectFields, IntrinsicSpanFields...)
|
||||
selectFields = append(selectFields, CalculatedSpanFields...)
|
||||
query.SelectFields = selectFields
|
||||
return query
|
||||
}
|
||||
|
||||
selectFields := []telemetrytypes.TelemetryFieldKey{
|
||||
{Name: SpanTimestampColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanTraceIDColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
{Name: SpanSpanIDColumn, FieldContext: telemetrytypes.FieldContextSpan},
|
||||
}
|
||||
for _, field := range query.SelectFields {
|
||||
// TODO(tvats): If a user specifies attribute.timestamp in the select fields, this loop will basically ignore it, as we already added a field by default. This can be fixed once we close https://github.com/SigNoz/engineering-pod/issues/3693
|
||||
if field.Name == SpanTimestampColumn || field.Name == SpanTraceIDColumn || field.Name == SpanSpanIDColumn {
|
||||
continue
|
||||
}
|
||||
selectFields = append(selectFields, field)
|
||||
}
|
||||
query.SelectFields = selectFields
|
||||
return query
|
||||
}
|
||||
|
||||
@@ -439,7 +439,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT name AS `name`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, duration_nano AS `duration_nano`, `attribute_number_cart$$items_count` AS `cart.items_count`, timestamp AS `timestamp`, span_id AS `span_id`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, name AS `name`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, duration_nano AS `duration_nano`, `attribute_number_cart$$items_count` AS `cart.items_count` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -468,7 +468,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, trace_state AS `trace_state`, parent_span_id AS `parent_span_id`, flags AS `flags`, name AS `name`, kind AS `kind`, kind_string AS `kind_string`, duration_nano AS `duration_nano`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -512,7 +512,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT name AS `name`, resource_string_service$$name AS `serviceName`, duration_nano AS `durationNano`, http_method AS `httpMethod`, response_status_code AS `responseStatusCode`, timestamp AS `timestamp`, span_id AS `span_id`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, name AS `name`, resource_string_service$$name AS `serviceName`, duration_nano AS `durationNano`, http_method AS `httpMethod`, response_status_code AS `responseStatusCode` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -556,7 +556,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT name AS `name`, resource_string_service$$name AS `serviceName`, duration_nano AS `durationNano`, http_method AS `httpMethod`, multiIf(toString(`attribute_string_mixed$$materialization$$key`) != '', toString(`attribute_string_mixed$$materialization$$key`), toString(multiIf(resource.`mixed.materialization.key` IS NOT NULL, resource.`mixed.materialization.key`::String, mapContains(resources_string, 'mixed.materialization.key'), resources_string['mixed.materialization.key'], NULL)) != '', toString(multiIf(resource.`mixed.materialization.key` IS NOT NULL, resource.`mixed.materialization.key`::String, mapContains(resources_string, 'mixed.materialization.key'), resources_string['mixed.materialization.key'], NULL)), NULL) AS `mixed.materialization.key`, timestamp AS `timestamp`, span_id AS `span_id`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, name AS `name`, resource_string_service$$name AS `serviceName`, duration_nano AS `durationNano`, http_method AS `httpMethod`, multiIf(toString(`attribute_string_mixed$$materialization$$key`) != '', toString(`attribute_string_mixed$$materialization$$key`), toString(multiIf(resource.`mixed.materialization.key` IS NOT NULL, resource.`mixed.materialization.key`::String, mapContains(resources_string, 'mixed.materialization.key'), resources_string['mixed.materialization.key'], NULL)) != '', toString(multiIf(resource.`mixed.materialization.key` IS NOT NULL, resource.`mixed.materialization.key`::String, mapContains(resources_string, 'mixed.materialization.key'), resources_string['mixed.materialization.key'], NULL)), NULL) AS `mixed.materialization.key` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -601,7 +601,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT name AS `name`, resource_string_service$$name AS `serviceName`, duration_nano AS `durationNano`, http_method AS `httpMethod`, `attribute_string_mixed$$materialization$$key` AS `mixed.materialization.key`, timestamp AS `timestamp`, span_id AS `span_id`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, name AS `name`, resource_string_service$$name AS `serviceName`, duration_nano AS `durationNano`, http_method AS `httpMethod`, `attribute_string_mixed$$materialization$$key` AS `mixed.materialization.key` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -711,7 +711,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, trace_state AS `trace_state`, parent_span_id AS `parent_span_id`, flags AS `flags`, name AS `name`, kind AS `kind`, kind_string AS `kind_string`, duration_nano AS `duration_nano`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -744,7 +744,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
}},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
|
||||
Query: "SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, trace_state AS `trace_state`, parent_span_id AS `parent_span_id`, flags AS `flags`, name AS `name`, kind AS `kind`, kind_string AS `kind_string`, duration_nano AS `duration_nano`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
|
||||
@@ -398,9 +398,23 @@ func (b *traceOperatorCTEBuilder) buildNotCTE(leftCTE, rightCTE string) (string,
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildFinalQuery(ctx context.Context, selectFromCTE string, requestType qbtypes.RequestType) (*qbtypes.Statement, error) {
|
||||
// Mirror statement_builder.go::Build: for raw queries, empty selectFields
|
||||
// expands to the full intrinsic + calculated set, and the list query also
|
||||
// pulls in the contextual columns so the consume layer can merge them
|
||||
// into unified attributes/resource (and parse events/links).
|
||||
isSelectFieldsEmpty := false
|
||||
if requestType == qbtypes.RequestTypeRaw {
|
||||
isSelectFieldsEmpty = len(b.operator.SelectFields) == 0
|
||||
if isSelectFieldsEmpty {
|
||||
b.operator.SelectFields = make([]telemetrytypes.TelemetryFieldKey, 0, len(IntrinsicSpanFields)+len(CalculatedSpanFields))
|
||||
b.operator.SelectFields = append(b.operator.SelectFields, IntrinsicSpanFields...)
|
||||
b.operator.SelectFields = append(b.operator.SelectFields, CalculatedSpanFields...)
|
||||
}
|
||||
}
|
||||
|
||||
switch requestType {
|
||||
case qbtypes.RequestTypeRaw:
|
||||
return b.buildListQuery(ctx, selectFromCTE)
|
||||
return b.buildListQuery(ctx, selectFromCTE, isSelectFieldsEmpty)
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
return b.buildTimeSeriesQuery(ctx, selectFromCTE)
|
||||
case qbtypes.RequestTypeTrace:
|
||||
@@ -412,10 +426,11 @@ func (b *traceOperatorCTEBuilder) buildFinalQuery(ctx context.Context, selectFro
|
||||
}
|
||||
}
|
||||
|
||||
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
|
||||
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string, isSelectFieldsEmpty bool) (*qbtypes.Statement, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
// Select core fields
|
||||
// Select core fields. These are always present so the trace operator
|
||||
// response shape is stable regardless of user-supplied selectFields.
|
||||
sb.Select(
|
||||
"timestamp",
|
||||
"trace_id",
|
||||
@@ -465,6 +480,12 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
|
||||
selectedFields[field.Name] = true
|
||||
}
|
||||
|
||||
if isSelectFieldsEmpty {
|
||||
for _, col := range ContextualSpanColumns {
|
||||
sb.SelectMore(col)
|
||||
}
|
||||
}
|
||||
|
||||
sb.From(selectFromCTE)
|
||||
|
||||
// Add order by support using ColumnExpressionFor
|
||||
|
||||
@@ -104,7 +104,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_INDIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, trace_state AS `trace_state`, flags AS `flags`, kind AS `kind`, kind_string AS `kind_string`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM A_INDIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 5},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -141,7 +141,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_AND_B AS (SELECT l.* FROM A AS l INNER JOIN B AS r ON l.trace_id = r.trace_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_AND_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_AND_B AS (SELECT l.* FROM A AS l INNER JOIN B AS r ON l.trace_id = r.trace_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, trace_state AS `trace_state`, flags AS `flags`, kind AS `kind`, kind_string AS `kind_string`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM A_AND_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 15},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -178,7 +178,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_OR_B AS (SELECT * FROM A UNION DISTINCT SELECT * FROM B) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_OR_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_OR_B AS (SELECT * FROM A UNION DISTINCT SELECT * FROM B) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, trace_state AS `trace_state`, flags AS `flags`, kind AS `kind`, kind_string AS `kind_string`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM A_OR_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 20},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -215,7 +215,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_not_B AS (SELECT l.* FROM A AS l WHERE l.trace_id GLOBAL NOT IN (SELECT DISTINCT trace_id FROM B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_not_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_not_B AS (SELECT l.* FROM A AS l WHERE l.trace_id GLOBAL NOT IN (SELECT DISTINCT trace_id FROM B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, trace_state AS `trace_state`, flags AS `flags`, kind AS `kind`, kind_string AS `kind_string`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM A_not_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -380,7 +380,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_D AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), D AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_D) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), C_DIR_DESC_D AS (SELECT p.* FROM C AS p INNER JOIN D AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id), A_DIR_DESC_B_AND_C_DIR_DESC_D AS (SELECT l.* FROM A_DIR_DESC_B AS l INNER JOIN C_DIR_DESC_D AS r ON l.trace_id = r.trace_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_DIR_DESC_B_AND_C_DIR_DESC_D ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_D AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), D AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_D) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), C_DIR_DESC_D AS (SELECT p.* FROM C AS p INNER JOIN D AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id), A_DIR_DESC_B_AND_C_DIR_DESC_D AS (SELECT l.* FROM A_DIR_DESC_B AS l INNER JOIN C_DIR_DESC_D AS r ON l.trace_id = r.trace_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, trace_state AS `trace_state`, flags AS `flags`, kind AS `kind`, kind_string AS `kind_string`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM A_DIR_DESC_B_AND_C_DIR_DESC_D ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 5},
|
||||
},
|
||||
expectedErr: nil,
|
||||
|
||||
@@ -236,6 +236,7 @@ type RawStream struct {
|
||||
Error chan error
|
||||
}
|
||||
|
||||
|
||||
func roundToNonZeroDecimals(val float64, n int) float64 {
|
||||
if val == 0 || math.IsNaN(val) || math.IsInf(val, 0) {
|
||||
return val
|
||||
|
||||
59
pkg/types/spantypes/event.go
Normal file
59
pkg/types/spantypes/event.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package spantypes
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
type Event struct {
|
||||
Name string `json:"name"`
|
||||
TimeUnixNano uint64 `json:"timeUnixNano"`
|
||||
Attributes map[string]any `json:"attributes,omitempty"`
|
||||
}
|
||||
|
||||
// Link is the response shape for a span link.
|
||||
// The refType field is intentionally not decoded; it's a Jaeger-era
|
||||
// concept that OTel doesn't model, so we drop it on the way out.
|
||||
type Link struct {
|
||||
TraceID string `json:"traceId,omitempty"`
|
||||
SpanID string `json:"spanId,omitempty"`
|
||||
}
|
||||
|
||||
// dbEvent matches the JSON object stored in the ClickHouse `events`
|
||||
// Array(String) column.
|
||||
type dbEvent struct {
|
||||
Name string `json:"name"`
|
||||
TimeUnixNano uint64 `json:"timeUnixNano"`
|
||||
AttributeMap map[string]any `json:"attributeMap"`
|
||||
}
|
||||
|
||||
// ParseEvents column (Array(String) of JSON-encoded events) into a slice of Event values.
|
||||
// Malformed entries are skipped.
|
||||
func ParseEvents(raw any) []Event {
|
||||
strs, ok := raw.([]string)
|
||||
if !ok {
|
||||
return []Event{}
|
||||
}
|
||||
events := make([]Event, 0, len(strs))
|
||||
for _, s := range strs {
|
||||
var e dbEvent
|
||||
if err := json.Unmarshal([]byte(s), &e); err != nil {
|
||||
continue
|
||||
}
|
||||
events = append(events, Event{
|
||||
Name: e.Name,
|
||||
TimeUnixNano: e.TimeUnixNano,
|
||||
Attributes: e.AttributeMap,
|
||||
})
|
||||
}
|
||||
return events
|
||||
}
|
||||
|
||||
func ParseLinks(raw any) []Link {
|
||||
s, ok := raw.(string)
|
||||
if !ok || s == "" {
|
||||
return []Link{}
|
||||
}
|
||||
var links []Link
|
||||
if err := json.Unmarshal([]byte(s), &links); err != nil {
|
||||
return []Link{}
|
||||
}
|
||||
return links
|
||||
}
|
||||
41
tests/fixtures/traces.py
vendored
41
tests/fixtures/traces.py
vendored
@@ -236,8 +236,9 @@ class Traces(ABC):
|
||||
attributes_number: dict[str, np.float64]
|
||||
attributes_bool: dict[str, bool]
|
||||
resources_string: dict[str, str]
|
||||
events: list[str]
|
||||
links: str
|
||||
# Accepting parsed events and links, but will be stored as list[str], str in db
|
||||
events: list[dict[str, Any]]
|
||||
links: list[dict[str, Any]]
|
||||
response_status_code: str
|
||||
external_http_url: str
|
||||
http_url: str
|
||||
@@ -423,10 +424,17 @@ class Traces(ABC):
|
||||
)
|
||||
)
|
||||
|
||||
# Process events and derive error events
|
||||
# Process events and derive error events. self.events holds the parsed
|
||||
# response shape; np_arr() encodes back to the DB format on insert.
|
||||
self.events = []
|
||||
for event in events:
|
||||
self.events.append(json.dumps([event.name, event.time_unix_nano, event.attribute_map]))
|
||||
self.events.append(
|
||||
{
|
||||
"name": event.name,
|
||||
"timeUnixNano": int(event.time_unix_nano),
|
||||
"attributes": dict(event.attribute_map),
|
||||
}
|
||||
)
|
||||
|
||||
# Create error events for exception events (following Go exporter logic)
|
||||
if event.name == "exception":
|
||||
@@ -448,7 +456,26 @@ class Traces(ABC):
|
||||
),
|
||||
)
|
||||
|
||||
self.links = json.dumps([link.__dict__() for link in links_copy], separators=(",", ":"))
|
||||
# self.links holds the parsed response shape (trace_id/span_id only;
|
||||
# ref_type is dropped to match the API). np_arr() re-encodes for DB insert.
|
||||
self.links = [{"traceId": link.trace_id, "spanId": link.span_id} for link in links_copy]
|
||||
self._links_db = json.dumps(
|
||||
[link.__dict__() for link in links_copy],
|
||||
separators=(",", ":"),
|
||||
)
|
||||
# DB shape per event: {"name", "timeUnixNano", "attributeMap"}. Must match
|
||||
# what the consume-layer parser in pkg/types/spantypes expects.
|
||||
self._events_db = [
|
||||
json.dumps(
|
||||
{
|
||||
"name": event.name,
|
||||
"timeUnixNano": int(event.time_unix_nano),
|
||||
"attributeMap": dict(event.attribute_map),
|
||||
},
|
||||
separators=(",", ":"),
|
||||
)
|
||||
for event in events
|
||||
]
|
||||
|
||||
# Initialize resource
|
||||
self.resource = []
|
||||
@@ -563,8 +590,8 @@ class Traces(ABC):
|
||||
self.attributes_number,
|
||||
self.attributes_bool,
|
||||
self.resources_string,
|
||||
self.events,
|
||||
self.links,
|
||||
self._events_db,
|
||||
self._links_db,
|
||||
self.response_status_code,
|
||||
self.external_http_url,
|
||||
self.http_url,
|
||||
|
||||
@@ -17,7 +17,51 @@ from fixtures.querier import (
|
||||
index_series_by_label,
|
||||
make_query_request,
|
||||
)
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
from fixtures.traces import (
|
||||
TraceIdGenerator,
|
||||
Traces,
|
||||
TracesEvent,
|
||||
TracesKind,
|
||||
TracesLink,
|
||||
TracesRefType,
|
||||
TracesStatusCode,
|
||||
)
|
||||
|
||||
# All keys returned by the trace list endpoint when selectFields is empty:
|
||||
# every intrinsic and calculated column, plus the merged `attributes` and
|
||||
# `resource` maps that wrap the contextual columns in the response layer.
|
||||
ALL_SELECT_FIELDS = [
|
||||
# all intrinsic columns
|
||||
"timestamp",
|
||||
"trace_id",
|
||||
"span_id",
|
||||
"trace_state",
|
||||
"parent_span_id",
|
||||
"flags",
|
||||
"name",
|
||||
"kind",
|
||||
"kind_string",
|
||||
"duration_nano",
|
||||
"status_code",
|
||||
"status_message",
|
||||
"status_code_string",
|
||||
"events",
|
||||
"links",
|
||||
# all calculated columns
|
||||
"response_status_code",
|
||||
"external_http_url",
|
||||
"http_url",
|
||||
"external_http_method",
|
||||
"http_method",
|
||||
"http_host",
|
||||
"db_name",
|
||||
"db_operation",
|
||||
"has_error",
|
||||
"is_remote",
|
||||
# all contextual columns (merged in response layer)
|
||||
"attributes",
|
||||
"resource",
|
||||
]
|
||||
|
||||
|
||||
def test_traces_list(
|
||||
@@ -473,7 +517,9 @@ def test_traces_list(
|
||||
@pytest.mark.parametrize(
|
||||
"payload,status_code,results",
|
||||
[
|
||||
# Case 1: order by timestamp field which there in attributes as well
|
||||
# Case 1: order by timestamp; empty selectFields returns the full
|
||||
# response shape (all intrinsic + calculated columns plus the merged
|
||||
# `attributes` and `resource` maps). x[3] (topic-service) is latest.
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
@@ -487,19 +533,42 @@ def test_traces_list(
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[3].duration_nano,
|
||||
{
|
||||
**x[3].attribute_string,
|
||||
**x[3].attributes_number,
|
||||
**x[3].attributes_bool,
|
||||
}, # attributes
|
||||
x[3].db_name,
|
||||
x[3].db_operation,
|
||||
int(x[3].duration_nano),
|
||||
x[3].events,
|
||||
x[3].external_http_method,
|
||||
x[3].external_http_url,
|
||||
int(x[3].flags),
|
||||
x[3].has_error,
|
||||
x[3].http_host,
|
||||
x[3].http_method,
|
||||
x[3].http_url,
|
||||
x[3].is_remote,
|
||||
int(x[3].kind),
|
||||
x[3].kind_string,
|
||||
x[3].links,
|
||||
x[3].name,
|
||||
x[3].parent_span_id,
|
||||
x[3].resources_string,
|
||||
x[3].response_status_code,
|
||||
x[3].service_name,
|
||||
x[3].span_id,
|
||||
int(x[3].status_code),
|
||||
x[3].status_code_string,
|
||||
x[3].status_message,
|
||||
format_timestamp(x[3].timestamp),
|
||||
x[3].trace_id,
|
||||
x[3].trace_state,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 2: order by attribute timestamp field which is there in attributes as well
|
||||
# This should break but it doesn't because attribute.timestamp gets adjusted to timestamp
|
||||
# because of default trace.timestamp gets added by default and bug in field mapper picks
|
||||
# instrinsic field
|
||||
# Case 2: order by attribute.timestamp. The key resolves to the
|
||||
# intrinsic span.timestamp column, so the latest span (x[3]) is
|
||||
# returned with the same full response shape as Case 1.
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
@@ -513,13 +582,37 @@ def test_traces_list(
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[3].duration_nano,
|
||||
{
|
||||
**x[3].attribute_string,
|
||||
**x[3].attributes_number,
|
||||
**x[3].attributes_bool,
|
||||
}, # attributes
|
||||
x[3].db_name,
|
||||
x[3].db_operation,
|
||||
int(x[3].duration_nano),
|
||||
x[3].events,
|
||||
x[3].external_http_method,
|
||||
x[3].external_http_url,
|
||||
int(x[3].flags),
|
||||
x[3].has_error,
|
||||
x[3].http_host,
|
||||
x[3].http_method,
|
||||
x[3].http_url,
|
||||
x[3].is_remote,
|
||||
int(x[3].kind),
|
||||
x[3].kind_string,
|
||||
x[3].links,
|
||||
x[3].name,
|
||||
x[3].parent_span_id,
|
||||
x[3].resources_string,
|
||||
x[3].response_status_code,
|
||||
x[3].service_name,
|
||||
x[3].span_id,
|
||||
int(x[3].status_code),
|
||||
x[3].status_code_string,
|
||||
x[3].status_message,
|
||||
format_timestamp(x[3].timestamp),
|
||||
x[3].trace_id,
|
||||
x[3].trace_state,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 3: select timestamp with empty order by
|
||||
@@ -542,7 +635,7 @@ def test_traces_list(
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 4: select attribute.timestamp with empty order by
|
||||
# This doesn't return any data because of where_clause using aliased timestamp
|
||||
# This returns the one span which has attribute.timestamp
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
@@ -556,7 +649,11 @@ def test_traces_list(
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [], # type: Callable[[List[Traces]], List[Any]]
|
||||
lambda x: [
|
||||
x[0].span_id,
|
||||
format_timestamp(x[0].timestamp),
|
||||
x[0].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 5: select timestamp with timestamp order by
|
||||
pytest.param(
|
||||
@@ -693,6 +790,159 @@ def test_traces_list_with_corrupt_data(
|
||||
assert data[key] == value
|
||||
|
||||
|
||||
def _verify_events_links_full(rows: list[dict], traces: list[Traces]) -> None:
|
||||
"""Empty-selectFields case: events/links arrive parsed into structured objects.
|
||||
Every row's events/links should match the fixture's stored parsed shape
|
||||
(the fixture's `.events`/`.links` mirror the API response shape directly).
|
||||
"""
|
||||
for row, trace in zip(rows, traces, strict=True):
|
||||
assert row["data"]["events"] == trace.events
|
||||
assert row["data"]["links"] == trace.links
|
||||
# Jaeger-era `refType` is dropped at the consume layer.
|
||||
for link in row["data"]["links"]:
|
||||
assert "refType" not in link
|
||||
|
||||
|
||||
def _verify_events_links_skip(rows: list[dict], traces: list[Traces]) -> None:
|
||||
"""Projected-selectFields case: nothing to verify beyond the key set."""
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"select_fields,status_code,expected_keys,verify_values",
|
||||
[
|
||||
pytest.param(
|
||||
[],
|
||||
HTTPStatus.OK,
|
||||
ALL_SELECT_FIELDS,
|
||||
_verify_events_links_full,
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{"name": "service.name"},
|
||||
],
|
||||
HTTPStatus.OK,
|
||||
["timestamp", "trace_id", "span_id", "service.name"],
|
||||
_verify_events_links_skip,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_traces_list_with_select_fields(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
select_fields: list[dict],
|
||||
status_code: HTTPStatus,
|
||||
expected_keys: list[str],
|
||||
verify_values: Callable[[list[dict], list[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert a root span with no events/links and a child span carrying two
|
||||
events and one user-supplied link.
|
||||
|
||||
Tests:
|
||||
1. Empty select fields should return all the fields, and the `events` /
|
||||
`links` columns should arrive parsed into structured objects (events
|
||||
carry `attributes`, links carry only `traceId`/`spanId` — refType is
|
||||
dropped at the consume layer).
|
||||
2. Non-empty select field should return the select field along with
|
||||
timestamp, trace_id and span_id.
|
||||
"""
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
parent_trace_id = TraceIdGenerator.trace_id()
|
||||
parent_span_id = TraceIdGenerator.span_id()
|
||||
child_span_id = TraceIdGenerator.span_id()
|
||||
linked_trace_id = TraceIdGenerator.trace_id()
|
||||
linked_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
event_one = TracesEvent(
|
||||
name="request_received",
|
||||
timestamp=now - timedelta(seconds=3, microseconds=500_000),
|
||||
attribute_map={"http.method": "GET", "http.route": "/api/chat"},
|
||||
)
|
||||
event_two = TracesEvent(
|
||||
name="cache_lookup",
|
||||
timestamp=now - timedelta(seconds=3, microseconds=400_000),
|
||||
attribute_map={"cache.hit": "true", "cache.key": "user:123:prompt"},
|
||||
)
|
||||
user_link = TracesLink(
|
||||
trace_id=linked_trace_id,
|
||||
span_id=linked_span_id,
|
||||
ref_type=TracesRefType.REF_TYPE_FOLLOWS_FROM,
|
||||
)
|
||||
|
||||
traces = [
|
||||
# Root span: no events, no links. Verifies the empty-case parsed shape.
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=parent_trace_id,
|
||||
span_id=parent_span_id,
|
||||
parent_span_id="",
|
||||
name="root span",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
resources={"service.name": "events-links-service"},
|
||||
attributes={"http.request.method": "GET"},
|
||||
),
|
||||
# Child span: two events + one user-supplied link. The fixture
|
||||
# auto-inserts a CHILD_OF link for the parent, so the parsed response
|
||||
# contains two links total — the auto-inserted one first.
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=3),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=parent_trace_id,
|
||||
span_id=child_span_id,
|
||||
parent_span_id=parent_span_id,
|
||||
name="child span",
|
||||
kind=TracesKind.SPAN_KIND_INTERNAL,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
resources={"service.name": "events-links-service"},
|
||||
attributes={"http.request.method": "GET"},
|
||||
events=[event_one, event_two],
|
||||
links=[user_link],
|
||||
),
|
||||
]
|
||||
|
||||
insert_traces(traces)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
payload = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": "resource.service.name = 'events-links-service'"},
|
||||
"selectFields": select_fields,
|
||||
"order": [{"key": {"name": "timestamp"}, "direction": "asc"}],
|
||||
"limit": 10,
|
||||
},
|
||||
}
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=5)).timestamp() * 1000),
|
||||
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
|
||||
request_type="raw",
|
||||
queries=[payload],
|
||||
)
|
||||
assert response.status_code == status_code
|
||||
|
||||
if response.status_code != HTTPStatus.OK:
|
||||
return
|
||||
|
||||
rows = response.json()["data"]["data"]["results"][0]["rows"]
|
||||
assert len(rows) == 2
|
||||
for row in rows:
|
||||
assert set(row["data"].keys()) == set(expected_keys)
|
||||
|
||||
verify_values(rows, traces)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_by,aggregation_alias,expected_status",
|
||||
[
|
||||
@@ -2123,3 +2373,178 @@ def test_traces_list_filter_by_trace_id(
|
||||
past_rows = _query(past_start_ms, past_end_ms)
|
||||
|
||||
assert len(past_rows) == 0, f"Expected 0 spans for trace_id filter outside time window, got {len(past_rows)}"
|
||||
|
||||
|
||||
# Hardcoded core columns the trace_operator buildListQuery always projects,
|
||||
# in addition to any user-supplied selectFields.
|
||||
TRACE_OPERATOR_CORE_FIELDS = [
|
||||
"timestamp",
|
||||
"trace_id",
|
||||
"span_id",
|
||||
"name",
|
||||
"duration_nano",
|
||||
"parent_span_id",
|
||||
]
|
||||
|
||||
|
||||
def _verify_full_expansion(rows: list[dict], parent_trace: Traces) -> None:
|
||||
"""Empty-selectFields case: every column from the builder_query parity set
|
||||
arrives, and events/links are parsed into structured form (refType is
|
||||
dropped at the consume layer).
|
||||
"""
|
||||
assert len(rows) == 1
|
||||
parent_row = rows[0]["data"]
|
||||
assert set(parent_row.keys()) == set(ALL_SELECT_FIELDS)
|
||||
assert parent_row["events"] == parent_trace.events
|
||||
assert parent_row["links"] == parent_trace.links
|
||||
for link in parent_row["links"]:
|
||||
assert "refType" not in link
|
||||
|
||||
|
||||
def _verify_explicit_projection(rows: list[dict], parent_trace: Traces) -> None: # pylint: disable=unused-argument
|
||||
"""Explicit-selectFields case: only the 6 hardcoded core fields plus the
|
||||
user-supplied resource.service.name come back. Contextual columns
|
||||
(events/links/attributes/resource) and the rest of the intrinsics never
|
||||
appear because the consume-layer merge isn't triggered.
|
||||
"""
|
||||
assert len(rows) == 1
|
||||
parent_row = rows[0]["data"]
|
||||
assert set(parent_row.keys()) == set(TRACE_OPERATOR_CORE_FIELDS + ["service.name"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"select_fields,verify_values",
|
||||
[
|
||||
pytest.param([], _verify_full_expansion, id="empty-select-fields"),
|
||||
pytest.param(
|
||||
[{"name": "service.name", "fieldContext": "resource"}],
|
||||
_verify_explicit_projection,
|
||||
id="explicit-service-name",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_trace_operator_select_fields(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
select_fields: list[dict[str, Any]],
|
||||
verify_values: Callable[[list[dict], Traces], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert a parent (operation.type = 'parent') with one event and one
|
||||
user-supplied link, plus a child span (operation.type = 'child').
|
||||
|
||||
Tests:
|
||||
1. With selectFields=[], the `A => B` trace_operator returns every column
|
||||
in ALL_SELECT_FIELDS, mirroring the builder_query path. Events arrive
|
||||
as {name, timeUnixNano, attributes} and links as {traceId, spanId}
|
||||
with refType dropped at the consume layer.
|
||||
2. With an explicit selectFields=[{"name": "service.name"}], only the 6
|
||||
hardcoded core columns plus service.name come back — no auto-expansion
|
||||
to the full set.
|
||||
|
||||
See:
|
||||
- pkg/telemetrytraces/trace_operator_cte_builder.go::buildFinalQuery for
|
||||
the expansion gate.
|
||||
- pkg/telemetrytraces/trace_operator_cte_builder.go::buildListQuery for
|
||||
the per-row SELECT.
|
||||
"""
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
trace_id = TraceIdGenerator.trace_id()
|
||||
parent_span_id = TraceIdGenerator.span_id()
|
||||
child_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
parent_event = TracesEvent(
|
||||
name="request_received",
|
||||
timestamp=now - timedelta(seconds=4, microseconds=500_000),
|
||||
attribute_map={"http.method": "GET"},
|
||||
)
|
||||
linked_trace_id = TraceIdGenerator.trace_id()
|
||||
linked_span_id = TraceIdGenerator.span_id()
|
||||
user_link = TracesLink(
|
||||
trace_id=linked_trace_id,
|
||||
span_id=linked_span_id,
|
||||
ref_type=TracesRefType.REF_TYPE_FOLLOWS_FROM,
|
||||
)
|
||||
|
||||
parent_trace = Traces(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
duration=timedelta(seconds=4),
|
||||
trace_id=trace_id,
|
||||
span_id=parent_span_id,
|
||||
parent_span_id="",
|
||||
name="parent-operation",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
resources={"service.name": "trace-operator-query"},
|
||||
attributes={"operation.type": "parent"},
|
||||
events=[parent_event],
|
||||
links=[user_link],
|
||||
)
|
||||
child_trace = Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=trace_id,
|
||||
span_id=child_span_id,
|
||||
parent_span_id=parent_span_id,
|
||||
name="child-operation",
|
||||
kind=TracesKind.SPAN_KIND_INTERNAL,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
resources={"service.name": "trace-operator-query"},
|
||||
attributes={"operation.type": "child"},
|
||||
)
|
||||
insert_traces([parent_trace, child_trace])
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
operator_spec: dict[str, Any] = {
|
||||
"name": "C",
|
||||
"expression": "A => B",
|
||||
"limit": 10,
|
||||
"order": [{"key": {"name": "timestamp"}, "direction": "asc"}],
|
||||
}
|
||||
if select_fields:
|
||||
operator_spec["selectFields"] = select_fields
|
||||
|
||||
queries = [
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": "operation.type = 'parent'"},
|
||||
"limit": 100,
|
||||
"disabled": True,
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "B",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": "operation.type = 'child'"},
|
||||
"limit": 100,
|
||||
"disabled": True,
|
||||
},
|
||||
},
|
||||
{"type": "builder_trace_operator", "spec": operator_spec},
|
||||
]
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=5)).timestamp() * 1000),
|
||||
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
|
||||
request_type="raw",
|
||||
queries=queries,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
results = response.json()["data"]["data"]["results"]
|
||||
trace_operator_result = find_named_result(results, "C")
|
||||
rows = trace_operator_result["rows"]
|
||||
verify_values(rows, parent_trace)
|
||||
|
||||
Reference in New Issue
Block a user