Compare commits

...

6 Commits

Author SHA1 Message Date
Tushar Vats
4ca1f5537b feat: added integration tests 2026-01-15 15:34:52 +05:30
Tushar Vats
a3a0761068 fix: improve order by 2026-01-13 20:35:04 +05:30
Tushar Vats
2de1ec3e68 fix: type handling logic 2026-01-13 18:16:05 +05:30
Tushar Vats
fb4adbdb83 fix: updated unit tests 2026-01-13 17:41:19 +05:30
Tushar Vats
6f3bf60a71 fix: added correct open api spec 2026-01-13 14:22:14 +05:30
Tushar Vats
b6a3eaa3ad feat: added trace export
feat: added types for export

feat: added support for complex queries
2026-01-13 14:01:40 +05:30
14 changed files with 2584 additions and 30 deletions

View File

@@ -607,6 +607,34 @@ paths:
summary: Update auth domain
tags:
- authdomains
/api/v1/export_raw_data:
get:
deprecated: false
description: This endpoints allows exporting raw data for traces and logs
operationId: HandleExportRawData
responses:
"200":
content:
application/json:
schema:
type: string
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
summary: Export raw data
tags:
- logs
- traces
/api/v1/getResetPasswordToken/{id}:
get:
deprecated: false

View File

@@ -16,6 +16,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/promote"
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
"github.com/SigNoz/signoz/pkg/modules/session"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/types"
@@ -39,6 +40,7 @@ type provider struct {
dashboardModule dashboard.Module
dashboardHandler dashboard.Handler
metricsExplorerHandler metricsexplorer.Handler
rawDataExportHandler rawdataexport.Handler
}
func NewFactory(
@@ -55,9 +57,10 @@ func NewFactory(
dashboardModule dashboard.Module,
dashboardHandler dashboard.Handler,
metricsExplorerHandler metricsexplorer.Handler,
rawDataExportHandler rawdataexport.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(ctx, providerSettings, config, orgGetter, authz, orgHandler, userHandler, sessionHandler, authDomainHandler, preferenceHandler, globalHandler, promoteHandler, flaggerHandler, dashboardModule, dashboardHandler, metricsExplorerHandler)
return newProvider(ctx, providerSettings, config, orgGetter, authz, orgHandler, userHandler, sessionHandler, authDomainHandler, preferenceHandler, globalHandler, promoteHandler, flaggerHandler, dashboardModule, dashboardHandler, metricsExplorerHandler, rawDataExportHandler)
})
}
@@ -78,6 +81,7 @@ func newProvider(
dashboardModule dashboard.Module,
dashboardHandler dashboard.Handler,
metricsExplorerHandler metricsexplorer.Handler,
rawDataExportHandler rawdataexport.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
@@ -97,6 +101,7 @@ func newProvider(
dashboardModule: dashboardModule,
dashboardHandler: dashboardHandler,
metricsExplorerHandler: metricsExplorerHandler,
rawDataExportHandler: rawDataExportHandler,
}
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
@@ -153,6 +158,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addRawDataExportRoutes(router); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,28 @@
package signozapiserver
import (
"net/http"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/types"
"github.com/gorilla/mux"
)
func (provider *provider) addRawDataExportRoutes(router *mux.Router) error {
if err := router.Handle("/api/v1/export_raw_data", handler.New(provider.authZ.ViewAccess(provider.rawDataExportHandler.ExportRawData), handler.OpenAPIDef{
ID: "HandleExportRawData",
Tags: []string{"logs", "traces"},
Summary: "Export raw data",
Description: "This endpoints allows exporting raw data for traces and logs",
Request: new(types.ExportRawDataQueryParams),
RequestContentType: "application/json",
Response: nil,
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest},
})).Methods(http.MethodGet).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types/authtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -60,6 +61,13 @@ func NewHandler(module rawdataexport.Module) rawdataexport.Handler {
// Direction: "asc" or "desc"
// Default: ["timestamp:desc", "id:desc"]
//
// - composite_query (optional): Advanced query specification as JSON-encoded QueryEnvelope array
// When provided, this overrides filter, columns, order_by, and limit parameters
// Format: JSON array of QueryEnvelope objects
// Each QueryEnvelope must have a valid "type" field (e.g., "builder_query", "builder_trace_operator")
// Supported types for traces: "builder_query", "builder_trace_operator"
// Note: Limits in composite queries are validated and cannot exceed MAX_EXPORT_ROW_COUNT_LIMIT
//
// Response Headers:
// - Content-Type: "text/csv" or "application/x-ndjson"
// - Content-Encoding: "gzip" (handled by HTTP middleware)
@@ -86,6 +94,11 @@ func NewHandler(module rawdataexport.Module) rawdataexport.Handler {
// Export with filter and ordering:
// GET /api/v1/export_raw_data?start=1693612800000000000&end=1693699199000000000
// &filter=severity="error"&order_by=timestamp:desc&limit=1000
//
// Export with composite query (advanced):
// GET /api/v1/export_raw_data?source=traces&start=1693612800000000000&end=1693699199000000000
// &composite_query={"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000}}
// &composite_query={"type":"builder_trace_operator","spec":{"name":"B","expression":"join","limit":500}}
func (handler *handler) ExportRawData(rw http.ResponseWriter, r *http.Request) {
source, err := getExportQuerySource(r.URL.Query())
if err != nil {
@@ -105,12 +118,169 @@ func (handler *handler) ExportRawData(rw http.ResponseWriter, r *http.Request) {
}
}
func (handler *handler) exportMetrics(rw http.ResponseWriter, r *http.Request) {
func (handler *handler) exportMetrics(rw http.ResponseWriter, _ *http.Request) {
render.Error(rw, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "metrics export is not yet supported"))
}
func (handler *handler) exportTraces(rw http.ResponseWriter, r *http.Request) {
render.Error(rw, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "traces export is not yet supported"))
// Set up response headers
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Vary", "Accept-Encoding") // Indicate that response varies based on Accept-Encoding
rw.Header().Set("Access-Control-Expose-Headers", "Content-Disposition, X-Response-Complete")
rw.Header().Set("Trailer", "X-Response-Complete")
rw.Header().Set("Transfer-Encoding", "chunked")
queryParams := r.URL.Query()
startTime, endTime, err := getExportQueryTimeRange(queryParams)
if err != nil {
render.Error(rw, err)
return
}
format, err := getExportQueryFormat(queryParams)
if err != nil {
render.Error(rw, err)
return
}
// Set appropriate content type and filename
filename := fmt.Sprintf("data_exported_%s.%s", time.Now().Format("2006-01-02_150405"), format)
rw.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filename))
queryRangeRequest := qbtypes.QueryRangeRequest{
Start: startTime,
End: endTime,
RequestType: qbtypes.RequestTypeRaw,
CompositeQuery: qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{},
},
}
compositeQueries, err := getCompositeQueriesFromQueryParams(queryParams)
if err != nil {
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query is malformed: %v", err))
return
}
if len(compositeQueries) == 0 {
// If no composite queries specified, add a default one
limit, err := getExportQueryLimit(queryParams)
if err != nil {
render.Error(rw, err)
return
}
filterExpression := queryParams.Get("filter")
orderByExpression, err := getExportQueryOrderByTraces(queryParams)
if err != nil {
render.Error(rw, err)
return
}
columns := getExportQueryColumns(queryParams)
spec := qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
SelectFields: columns,
Filter: &qbtypes.Filter{
Expression: filterExpression,
},
Limit: limit,
Order: orderByExpression,
}
compositeQueries = append(compositeQueries, qbtypes.QueryEnvelope{
Type: qbtypes.QueryTypeBuilder,
Spec: spec,
})
}
for i := range compositeQueries {
// Ensure each query envelope has the correct type
if compositeQueries[i].Type != qbtypes.QueryTypeBuilder && compositeQueries[i].Type != qbtypes.QueryTypeTraceOperator {
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d has unsupported type: %s", i, compositeQueries[i].Type.StringValue()))
return
}
limit := DefaultExportRowCountLimit
switch compositeQueries[i].Type {
case qbtypes.QueryTypeBuilder, qbtypes.QueryTypeTraceOperator:
switch spec := compositeQueries[i].Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if spec.Limit == 0 {
spec.Limit = limit
} else if spec.Limit > MaxExportRowCountLimit {
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d limit cannot be more than %d", i, MaxExportRowCountLimit))
return
}
compositeQueries[i].Spec = spec
case qbtypes.QueryBuilderTraceOperator:
if spec.Limit == 0 {
spec.Limit = limit
} else if spec.Limit > MaxExportRowCountLimit {
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d limit cannot be more than %d", i, MaxExportRowCountLimit))
return
}
compositeQueries[i].Spec = spec
default:
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d has invalid spec type for builder query", i))
return
}
default:
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d has unsupported type: %s", i, compositeQueries[i].Type.StringValue()))
return
}
}
queryRangeRequest.CompositeQuery.Queries = compositeQueries
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "orgID is invalid"))
return
}
// This will signal Export module to stop sending data
doneChan := make(chan any)
defer close(doneChan)
rowChan, errChan := handler.module.ExportRawData(r.Context(), orgID, &queryRangeRequest, doneChan)
var isComplete bool
switch format {
case "csv", "":
rw.Header().Set("Content-Type", "text/csv")
csvWriter := csv.NewWriter(rw)
isComplete, err = handler.exportRawDataCSV(rowChan, errChan, csvWriter)
if err != nil {
render.Error(rw, err)
return
}
csvWriter.Flush()
case "jsonl":
rw.Header().Set("Content-Type", "application/x-ndjson")
isComplete, err = handler.exportRawDataJSONL(rowChan, errChan, rw)
if err != nil {
render.Error(rw, err)
return
}
default:
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid format: must be csv or jsonl"))
return
}
rw.Header().Set("X-Response-Complete", strconv.FormatBool(isComplete))
}
func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
@@ -147,7 +317,7 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
filterExpression := queryParams.Get("filter")
orderByExpression, err := getExportQueryOrderBy(queryParams)
orderByExpression, err := getExportQueryOrderByLogs(queryParams)
if err != nil {
render.Error(rw, err)
return
@@ -206,7 +376,7 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
case "csv", "":
rw.Header().Set("Content-Type", "text/csv")
csvWriter := csv.NewWriter(rw)
isComplete, err = handler.exportLogsCSV(rowChan, errChan, csvWriter)
isComplete, err = handler.exportRawDataCSV(rowChan, errChan, csvWriter)
if err != nil {
render.Error(rw, err)
return
@@ -214,7 +384,7 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
csvWriter.Flush()
case "jsonl":
rw.Header().Set("Content-Type", "application/x-ndjson")
isComplete, err = handler.exportLogsJSONL(rowChan, errChan, rw)
isComplete, err = handler.exportRawDataJSONL(rowChan, errChan, rw)
if err != nil {
render.Error(rw, err)
return
@@ -227,7 +397,8 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("X-Response-Complete", strconv.FormatBool(isComplete))
}
func (handler *handler) exportLogsCSV(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, csvWriter *csv.Writer) (bool, error) {
// exportRawDataCSV is a generic CSV export function that works with any raw data (logs, traces, etc.)
func (handler *handler) exportRawDataCSV(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, csvWriter *csv.Writer) (bool, error) {
var header []string
headerToIndexMapping := make(map[string]int, len(header))
@@ -268,8 +439,8 @@ func (handler *handler) exportLogsCSV(rowChan <-chan *qbtypes.RawRow, errChan <-
}
}
func (handler *handler) exportLogsJSONL(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, writer io.Writer) (bool, error) {
// exportRawDataJSONL is a generic JSONL export function that works with any raw data (logs, traces, etc.)
func (handler *handler) exportRawDataJSONL(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, writer io.Writer) (bool, error) {
totalBytes := uint64(0)
for {
select {
@@ -306,7 +477,7 @@ func getExportQuerySource(queryParams url.Values) (string, error) {
case "metrics":
return "metrics", errors.NewInvalidInputf(errors.CodeInvalidInput, "metrics export not yet supported")
case "traces":
return "traces", errors.NewInvalidInputf(errors.CodeInvalidInput, "traces export not yet supported")
return "traces", nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid source: must be logs, metrics or traces")
}
@@ -474,7 +645,7 @@ func getExportQueryOrderBy(queryParams url.Values) ([]qbtypes.OrderBy, error) {
orderByParam = strings.TrimSpace(orderByParam)
if orderByParam == "" {
return telemetrylogs.DefaultLogsV2SortingOrder, nil
return []qbtypes.OrderBy{}, nil
}
parts := strings.Split(orderByParam, ":")
@@ -492,25 +663,78 @@ func getExportQueryOrderBy(queryParams url.Values) ([]qbtypes.OrderBy, error) {
orderByKey := telemetrytypes.GetFieldKeyFromKeyText(column)
orderBy := []qbtypes.OrderBy{
return []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: orderByKey,
},
Direction: orderDirection,
},
}, nil
}
func getExportQueryOrderByLogs(queryParams url.Values) ([]qbtypes.OrderBy, error) {
orderBy, err := getExportQueryOrderBy(queryParams)
if err != nil {
return nil, err
}
if len(orderBy) == 0 {
// Default sorting for logs: timestamp desc, id desc
return telemetrylogs.DefaultLogsV2SortingOrder, nil
}
// If we are ordering by the timestamp column, also order by the ID column
if orderByKey.Name == telemetrylogs.LogsV2TimestampColumn {
if orderBy[0].Key.Name == telemetrylogs.DefaultLogsV2SortingOrder[0].Key.Name {
orderBy = append(orderBy, qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: telemetrylogs.LogsV2IDColumn,
},
TelemetryFieldKey: telemetrylogs.DefaultLogsV2SortingOrder[1].Key.TelemetryFieldKey,
},
Direction: orderDirection,
Direction: orderBy[0].Direction,
})
}
return orderBy, nil
}
// getExportQueryOrderByTraces parses the "order_by" query parameters for traces and returns a slice of OrderBy structs.
// Each "order_by" parameter should be in the format "column:direction"
// Each "column" should be a valid telemetry field key in the format "context.field:type" or "context.field" or "field"
func getExportQueryOrderByTraces(queryParams url.Values) ([]qbtypes.OrderBy, error) {
orderBy, err := getExportQueryOrderBy(queryParams)
if err != nil {
return nil, err
}
if len(orderBy) == 0 {
// Default sorting for logs: timestamp desc, span_id desc
return telemetrytraces.DefaultTracesSortingOrder, nil
}
// If we are ordering by the timestamp column, also order by the span ID column
if orderBy[0].Key.Name == telemetrytraces.DefaultTracesSortingOrder[0].Key.Name {
orderBy = append(orderBy, qbtypes.OrderBy{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytraces.DefaultTracesSortingOrder[1].Key.TelemetryFieldKey,
},
Direction: orderBy[0].Direction,
})
}
return orderBy, nil
}
func getCompositeQueriesFromQueryParams(queryParams url.Values) ([]qbtypes.QueryEnvelope, error) {
// Check if composite_query parameter exists in query params
compositeQueryParams := queryParams["composite_query"]
// Try to unmarshal the composite_query JSON parameter
var queries []qbtypes.QueryEnvelope
for _, compositeQueryParam := range compositeQueryParams {
var query qbtypes.QueryEnvelope
if err := json.Unmarshal([]byte(compositeQueryParam), &query); err != nil {
// If unmarshaling fails, return empty slice (will fall back to default query)
return nil, err
}
queries = append(queries, query)
}
return queries, nil
}

View File

@@ -1,6 +1,7 @@
package implrawdataexport
import (
"fmt"
"net/url"
"strconv"
"testing"
@@ -37,10 +38,10 @@ func TestGetExportQuerySource(t *testing.T) {
expectedError: true,
},
{
name: "traces source - not supported",
name: "traces source - supported",
queryParams: url.Values{"source": {"traces"}},
expectedSource: "traces",
expectedError: true,
expectedError: false,
},
{
name: "invalid source",
@@ -318,7 +319,7 @@ func TestGetExportQueryColumns(t *testing.T) {
}
}
func TestGetExportQueryOrderBy(t *testing.T) {
func TestGetExportQueryOrderByLogs(t *testing.T) {
tests := []struct {
name string
queryParams url.Values
@@ -505,7 +506,7 @@ func TestGetExportQueryOrderBy(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
order, err := getExportQueryOrderBy(tt.queryParams)
order, err := getExportQueryOrderByLogs(tt.queryParams)
if tt.expectedError {
assert.Error(t, err)
} else {
@@ -561,3 +562,468 @@ func TestConstructCSVRecordFromQueryResponse(t *testing.T) {
assert.Equal(t, "INFO", record[2])
assert.Equal(t, "test-id", record[3])
}
func TestGetExportQueryOrderByTraces(t *testing.T) {
tests := []struct {
name string
queryParams url.Values
expectedOrder []qbtypes.OrderBy
expectedError bool
}{
{
name: "no order specified - should use default (timestamp:desc, span_id:desc)",
queryParams: url.Values{},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
},
},
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "span_id",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
},
expectedError: false,
},
{
name: "order by timestamp asc - should also add span_id asc",
queryParams: url.Values{
"order_by": {"timestamp:asc"},
},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionAsc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
},
},
},
{
Direction: qbtypes.OrderDirectionAsc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "span_id",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
},
expectedError: false,
},
{
name: "order by timestamp desc - should also add span_id desc",
queryParams: url.Values{
"order_by": {"timestamp:desc"},
},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
},
},
},
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "span_id",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
},
expectedError: false,
},
{
name: "order by non-timestamp field - should not add span_id",
queryParams: url.Values{
"order_by": {"duration:desc"},
},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "duration",
},
},
},
},
expectedError: false,
},
{
name: "order by attribute with type",
queryParams: url.Values{
"order_by": {"attribute.http.status_code:number:asc"},
},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionAsc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "http.status_code",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
},
},
},
expectedError: false,
},
{
name: "order by resource with type",
queryParams: url.Values{
"order_by": {"resource.service.name:string:desc"},
},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
},
expectedError: false,
},
{
name: "order by attribute without type (dot notation)",
queryParams: url.Values{
"order_by": {"attribute.http.method:asc"},
},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionAsc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "http.method",
FieldContext: telemetrytypes.FieldContextAttribute,
},
},
},
},
expectedError: false,
},
{
name: "invalid format - missing direction",
queryParams: url.Values{
"order_by": {"timestamp"},
},
expectedOrder: nil,
expectedError: true,
},
{
name: "invalid direction",
queryParams: url.Values{
"order_by": {"timestamp:invalid"},
},
expectedOrder: nil,
expectedError: true,
},
{
name: "empty order_by value - should use default",
queryParams: url.Values{
"order_by": {""},
},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
},
},
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "span_id",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
},
expectedError: false,
},
{
name: "whitespace only order_by value - should use default",
queryParams: url.Values{
"order_by": {" "},
},
expectedOrder: []qbtypes.OrderBy{
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
},
},
{
Direction: qbtypes.OrderDirectionDesc,
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "span_id",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
},
expectedError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
order, err := getExportQueryOrderByTraces(tt.queryParams)
if tt.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, len(tt.expectedOrder), len(order))
for i, expectedOrd := range tt.expectedOrder {
assert.Equal(t, expectedOrd, order[i])
}
}
})
}
}
func TestGetCompositeQueriesFromQueryParams(t *testing.T) {
tests := []struct {
name string
queryParams url.Values
expectedQueries []qbtypes.QueryEnvelope
expectError bool
}{
{
name: "no composite_query parameter - should return empty slice",
queryParams: url.Values{},
expectedQueries: nil,
expectError: false,
},
{
name: "single valid builder query",
queryParams: url.Values{
"composite_query": {`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000}}`},
},
expectedQueries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
Name: "A",
Limit: 1000,
},
},
},
expectError: false,
},
{
name: "single valid trace_operator query",
queryParams: url.Values{
"composite_query": {`{"type":"builder_trace_operator","spec":{"name":"B","expression":"join","limit":500}}`},
},
expectedQueries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeTraceOperator,
Spec: qbtypes.QueryBuilderTraceOperator{
Name: "B",
Expression: "join",
Limit: 500,
},
},
},
expectError: false,
},
{
name: "multiple composite queries",
queryParams: url.Values{
"composite_query": {
`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000}}`,
`{"type":"builder_trace_operator","spec":{"name":"B","expression":"join","limit":500}}`,
},
},
expectedQueries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
Name: "A",
Limit: 1000,
},
},
{
Type: qbtypes.QueryTypeTraceOperator,
Spec: qbtypes.QueryBuilderTraceOperator{
Name: "B",
Expression: "join",
Limit: 500,
},
},
},
expectError: false,
},
{
name: "invalid JSON - should return error",
queryParams: url.Values{
"composite_query": {`{invalid json`},
},
expectedQueries: nil,
expectError: true,
},
{
name: "empty composite_query value - should return error",
queryParams: url.Values{
"composite_query": {""},
},
expectedQueries: nil,
expectError: true,
},
{
name: "valid and invalid queries mixed - should return error on first invalid",
queryParams: url.Values{
"composite_query": {
`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000}}`,
`{invalid}`,
},
},
expectedQueries: nil,
expectError: true,
},
{
name: "query without type field - should return error",
queryParams: url.Values{
"composite_query": {`{"spec":{"signal":"traces","name":"A","limit":1000}}`},
},
expectedQueries: nil,
expectError: true,
},
{
name: "query with complex spec including filter",
queryParams: url.Values{
"composite_query": {`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000,"filter":{"expression":"status=error"}}}`},
},
expectedQueries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
Name: "A",
Limit: 1000,
Filter: &qbtypes.Filter{
Expression: "status=error",
},
},
},
},
expectError: false,
},
{
name: "query with order by clause",
queryParams: url.Values{
"composite_query": {`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000,"order":[{"key":{"name":"timestamp"},"direction":"desc"},{"key":{"name":"span_id"},"direction":"desc"}]}}`},
},
expectedQueries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
Name: "A",
Limit: 1000,
Order: []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "span_id",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
},
},
},
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queries, err := getCompositeQueriesFromQueryParams(tt.queryParams)
if tt.expectError {
assert.Error(t, err, "expected an error")
assert.Nil(t, queries, "queries should be nil when error occurs")
} else {
assert.NoError(t, err, "expected no error")
if tt.expectedQueries == nil {
assert.Nil(t, queries, "expected nil queries")
} else {
assert.NotNil(t, queries, "expected non-nil queries")
assert.Equal(t, len(tt.expectedQueries), len(queries), "number of queries mismatch")
if len(queries) == len(tt.expectedQueries) {
for idx := range len(tt.expectedQueries) {
assert.Equal(t, tt.expectedQueries[idx].Type, queries[idx].Type, "query type mismatch at index "+strconv.Itoa(idx))
// Validate spec type matches if expected spec is not nil
if tt.expectedQueries[idx].Spec != nil {
assert.NotNil(t, queries[idx].Spec, "spec should not be nil at index "+strconv.Itoa(idx))
// Check the type of spec matches expected
expectedSpecType := fmt.Sprintf("%T", tt.expectedQueries[idx].Spec)
actualSpecType := fmt.Sprintf("%T", queries[idx].Spec)
assert.Equal(t, expectedSpecType, actualSpecType, "spec type mismatch at index "+strconv.Itoa(idx))
}
}
}
}
}
})
}
}

View File

@@ -23,8 +23,21 @@ func NewModule(querier querier.Querier) rawdataexport.Module {
func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, doneChan chan any) (chan *qbtypes.RawRow, chan error) {
spec := rangeRequest.CompositeQuery.Queries[0].Spec.(qbtypes.QueryBuilderQuery[qbtypes.LogAggregation])
rowCountLimit := spec.Limit
isTraceOperatorQueryPresent := false
for _, query := range rangeRequest.CompositeQuery.Queries {
if _, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
isTraceOperatorQueryPresent = true
break
}
}
if isTraceOperatorQueryPresent {
return exportRawDataForTraceOperatorQuery(m.querier, ctx, orgID, rangeRequest, doneChan)
}
return exportRawDataForQueryBuilderQueries(m.querier, ctx, orgID, rangeRequest, doneChan)
}
func exportRawDataForQueryBuilderQueries(querier querier.Querier, ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, doneChan chan any) (chan *qbtypes.RawRow, chan error) {
rowChan := make(chan *qbtypes.RawRow, 1)
errChan := make(chan error, 1)
@@ -38,15 +51,124 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
defer close(errChan)
defer close(rowChan)
compositeQueries := rangeRequest.CompositeQuery.Queries
appendQueryName := len(compositeQueries) > 1
for _, query := range compositeQueries {
dupRangeRequest := rangeRequest.Copy()
dupRangeRequest.CompositeQuery.Queries = []qbtypes.QueryEnvelope{query}
switch query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
exportRawDataForQueryBuilderQuery[qbtypes.LogAggregation](querier, contextWithTimeout, orgID, &dupRangeRequest, rowChan, errChan, doneChan, appendQueryName)
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
exportRawDataForQueryBuilderQuery[qbtypes.MetricAggregation](querier, contextWithTimeout, orgID, &dupRangeRequest, rowChan, errChan, doneChan, appendQueryName)
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
exportRawDataForQueryBuilderQuery[qbtypes.TraceAggregation](querier, contextWithTimeout, orgID, &dupRangeRequest, rowChan, errChan, doneChan, appendQueryName)
default:
errChan <- errors.NewInternalf(errors.CodeInternal, "unsupported query spec type: %T", query.Spec)
return
}
}
}()
return rowChan, errChan
}
func exportRawDataForQueryBuilderQuery[T any](querier querier.Querier, ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, rowChan chan *qbtypes.RawRow, errChan chan error, doneChan chan any, appendQueryName bool) {
spec, ok := rangeRequest.CompositeQuery.Queries[0].Spec.(qbtypes.QueryBuilderQuery[T])
if !ok {
errChan <- errors.NewInternalf(errors.CodeInternal, "invalid spec type for query builder query")
return
}
rowCountLimit := spec.Limit
rowCount := 0
for rowCount < rowCountLimit {
spec.Limit = min(ChunkSize, rowCountLimit-rowCount)
spec.Offset = rowCount
rangeRequest.CompositeQuery.Queries[0].Spec = spec
response, err := querier.QueryRange(ctx, orgID, rangeRequest)
if err != nil {
errChan <- err
return
}
newRowsCount := 0
for _, result := range response.Data.Results {
resultData, ok := result.(*qbtypes.RawData)
if !ok {
errChan <- errors.NewInternalf(errors.CodeInternal, "expected RawData, got %T", result)
return
}
newRowsCount += len(resultData.Rows)
for _, row := range resultData.Rows {
if appendQueryName {
row.Data["__query_name"] = spec.Name
}
select {
case rowChan <- row:
case <-doneChan:
return
case <-ctx.Done():
errChan <- ctx.Err()
return
}
}
}
// Break if we did not receive any new rows
if newRowsCount == 0 {
return
}
rowCount += newRowsCount
}
}
func exportRawDataForTraceOperatorQuery(querier querier.Querier, ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, doneChan chan any) (chan *qbtypes.RawRow, chan error) {
traceOperatorIndex := 0
for i, query := range rangeRequest.CompositeQuery.Queries {
if _, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
traceOperatorIndex = i
break
}
}
rowChan := make(chan *qbtypes.RawRow, 1)
errChan := make(chan error, 1)
go func() {
// Set clickhouse max threads
ctx := ctxtypes.SetClickhouseMaxThreads(ctx, ClickhouseExportRawDataMaxThreads)
// Set clickhouse timeout
contextWithTimeout, cancel := context.WithTimeout(ctx, ClickhouseExportRawDataTimeout)
defer cancel()
defer close(errChan)
defer close(rowChan)
spec, ok := rangeRequest.CompositeQuery.Queries[traceOperatorIndex].Spec.(qbtypes.QueryBuilderTraceOperator)
if !ok {
errChan <- errors.NewInternalf(errors.CodeInternal, "invalid spec type for query builder trace operator")
return
}
rowCountLimit := spec.Limit
rowCount := 0
for rowCount < rowCountLimit {
spec.Limit = min(ChunkSize, rowCountLimit-rowCount)
spec.Offset = rowCount
rangeRequest.CompositeQuery.Queries[traceOperatorIndex].Spec = spec
rangeRequest.CompositeQuery.Queries[0].Spec = spec
response, err := m.querier.QueryRange(contextWithTimeout, orgID, rangeRequest)
response, err := querier.QueryRange(contextWithTimeout, orgID, rangeRequest)
if err != nil {
errChan <- err
return
@@ -71,7 +193,6 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
return
}
}
}
// Break if we did not receive any new rows
@@ -80,7 +201,6 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
}
rowCount += newRowsCount
}
}()

View File

@@ -593,7 +593,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
})).Methods(http.MethodGet)
// Export
router.HandleFunc("/api/v1/export_raw_data", am.ViewAccess(aH.Signoz.Handlers.RawDataExport.ExportRawData)).Methods(http.MethodGet)
// router.HandleFunc("/api/v1/export_raw_data", am.ViewAccess(aH.Signoz.Handlers.RawDataExport.ExportRawData)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/span_percentile", am.ViewAccess(aH.Signoz.Handlers.SpanPercentile.GetSpanPercentileDetails)).Methods(http.MethodPost)

View File

@@ -18,6 +18,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/promote"
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
"github.com/SigNoz/signoz/pkg/modules/session"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
@@ -47,6 +48,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ dashboard.Module }{},
struct{ dashboard.Handler }{},
struct{ metricsexplorer.Handler }{},
struct{ rawdataexport.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -247,6 +247,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
modules.Dashboard,
handlers.Dashboard,
handlers.MetricsExplorer,
handlers.RawDataExport,
),
)
}

View File

@@ -1,6 +1,9 @@
package telemetrytraces
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
var (
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
@@ -373,4 +376,19 @@ var (
FieldDataType: telemetrytypes.FieldDataTypeString,
},
}
DefaultTracesSortingOrder = []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: DefaultFields["timestamp"],
},
Direction: qbtypes.OrderDirectionDesc,
},
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: DefaultFields["span_id"],
},
Direction: qbtypes.OrderDirectionDesc,
},
}
)

View File

@@ -2,6 +2,7 @@ package querybuildertypesv5
import (
"encoding/json"
"reflect"
"strings"
"github.com/SigNoz/govaluate"
@@ -450,3 +451,74 @@ func (r *QueryRangeRequest) GetQueriesSupportingZeroDefault() map[string]bool {
return canDefaultZero
}
func (r *QueryRangeRequest) Copy() QueryRangeRequest {
c := QueryRangeRequest{
SchemaVersion: r.SchemaVersion,
Start: r.Start,
End: r.End,
RequestType: r.RequestType,
NoCache: r.NoCache,
FormatOptions: r.FormatOptions,
Variables: make(map[string]VariableItem),
CompositeQuery: CompositeQuery{},
}
for k, v := range r.Variables {
c.Variables[k] = v
}
c.CompositeQuery.Queries = make([]QueryEnvelope, len(r.CompositeQuery.Queries))
for i, q := range r.CompositeQuery.Queries {
if q.Spec != nil {
switch spec := q.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
c.CompositeQuery.Queries[i] = QueryEnvelope{
Type: q.Type,
Spec: spec.Copy(),
}
case QueryBuilderQuery[LogAggregation]:
c.CompositeQuery.Queries[i] = QueryEnvelope{
Type: q.Type,
Spec: spec.Copy(),
}
case QueryBuilderQuery[MetricAggregation]:
c.CompositeQuery.Queries[i] = QueryEnvelope{
Type: q.Type,
Spec: spec.Copy(),
}
case QueryBuilderFormula:
c.CompositeQuery.Queries[i] = QueryEnvelope{
Type: q.Type,
Spec: spec.Copy(),
}
case QueryBuilderJoin:
c.CompositeQuery.Queries[i] = QueryEnvelope{
Type: q.Type,
Spec: spec.Copy(),
}
case QueryBuilderTraceOperator:
c.CompositeQuery.Queries[i] = QueryEnvelope{
Type: q.Type,
Spec: spec.Copy(),
}
default:
// Check if spec implements a Copy method
specValue := reflect.ValueOf(q.Spec)
copyMethod := specValue.MethodByName("Copy")
if copyMethod.IsValid() && copyMethod.Type().NumIn() == 0 && copyMethod.Type().NumOut() == 1 {
// Call Copy method and use the result
result := copyMethod.Call(nil)
c.CompositeQuery.Queries[i] = QueryEnvelope{
Type: q.Type,
Spec: result[0].Interface(),
}
} else {
// Fallback to shallow copy
c.CompositeQuery.Queries[i] = q
}
}
}
}
return c
}

View File

@@ -0,0 +1,39 @@
package types
import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
// ExportRawDataQueryParams represents the query parameters for the export raw data endpoint
type ExportRawDataQueryParams struct {
// Source specifies the type of data to export: "logs", "metrics", or "traces"
Source string `json:"source" schema:"source"`
// Format specifies the output format: "csv" or "jsonl"
Format string `json:"format" schema:"format"`
// Start is the start time for the query (Unix timestamp in nanoseconds)
Start uint64 `json:"start" schema:"start"`
// End is the end time for the query (Unix timestamp in nanoseconds)
End uint64 `json:"end" schema:"end"`
// Limit specifies the maximum number of rows to export
Limit int `json:"limit" schema:"limit"`
// Filter is a filter expression to apply to the query
Filter string `json:"filter" schema:"filter"`
// Columns specifies the columns to include in the export
// Format: ["context.field:type", "context.field", "field"]
Columns []string `json:"columns" schema:"columns"`
// OrderBy specifies the sorting order
// Format: "column:direction" or "context.field:type:direction"
// Direction can be "asc" or "desc"
OrderBy string `json:"order_by" schema:"order_by"`
// CompositeQuery is an advanced query specification as JSON-encoded QueryEnvelope array
// When provided, this overrides filter, columns, order_by, and limit parameters
CompositeQuery []qbtypes.QueryEnvelope `json:"composite_query" schema:"composite_query"`
}

View File

@@ -0,0 +1,620 @@
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, List
from urllib.parse import urlencode
import csv
import io
import json
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logs import Logs
def test_export_logs_csv(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Setup:
Insert 3 logs with different severity levels and attributes.
Tests:
1. Export logs as CSV format
2. Verify CSV structure and content
3. Validate headers are present
4. Check log data is correctly formatted
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=10),
body="Application started successfully",
severity_text="INFO",
resources={
"service.name": "api-service",
"deployment.environment": "production",
"host.name": "server-01",
},
attributes={
"http.method": "GET",
"http.status_code": 200,
"user.id": "user123",
},
),
Logs(
timestamp=now - timedelta(seconds=8),
body="Connection to database failed",
severity_text="ERROR",
resources={
"service.name": "api-service",
"deployment.environment": "production",
"host.name": "server-01",
},
attributes={
"error.type": "ConnectionError",
"db.name": "production_db",
},
),
Logs(
timestamp=now - timedelta(seconds=5),
body="Request processed",
severity_text="DEBUG",
resources={
"service.name": "worker-service",
"deployment.environment": "production",
"host.name": "server-02",
},
attributes={
"request.id": "req-456",
"duration_ms": 150.5,
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
params = {
"start": start_ns,
"end": end_ns,
"format": "csv",
"source": "logs",
}
# Export logs as CSV (default format, no source needed)
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "text/csv"
assert "attachment" in response.headers.get("Content-Disposition", "")
# Parse CSV content
csv_content = response.text
csv_reader = csv.DictReader(io.StringIO(csv_content))
rows = list(csv_reader)
assert len(rows) == 3, f"Expected 3 rows, got {len(rows)}"
# Verify log bodies are present in the exported data
bodies = [row.get("body") for row in rows]
assert "Application started successfully" in bodies
assert "Connection to database failed" in bodies
assert "Request processed" in bodies
# Verify severity levels
severities = [row.get("severity_text") for row in rows]
assert "INFO" in severities
assert "ERROR" in severities
assert "DEBUG" in severities
def test_export_logs_jsonl(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Setup:
Insert 2 logs with different attributes.
Tests:
1. Export logs as JSONL format
2. Verify JSONL structure and content
3. Check each line is valid JSON
4. Validate log data is correctly formatted
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=10),
body="User logged in",
severity_text="INFO",
resources={
"service.name": "auth-service",
"deployment.environment": "staging",
},
attributes={
"user.email": "test@example.com",
"session.id": "sess-789",
},
),
Logs(
timestamp=now - timedelta(seconds=5),
body="Payment processed successfully",
severity_text="INFO",
resources={
"service.name": "payment-service",
"deployment.environment": "staging",
},
attributes={
"transaction.id": "txn-123",
"amount": 99.99,
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "logs",
}
# Export logs as JSONL
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
assert "attachment" in response.headers.get("Content-Disposition", "")
# Parse JSONL content
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) == 2, f"Expected 2 lines, got {len(jsonl_lines)}"
# Verify each line is valid JSON
json_objects = []
for line in jsonl_lines:
obj = json.loads(line)
json_objects.append(obj)
assert "id" in obj
assert "timestamp" in obj
assert "body" in obj
assert "severity_text" in obj
# Verify log bodies
bodies = [obj.get("body") for obj in json_objects]
assert "User logged in" in bodies
assert "Payment processed successfully" in bodies
def test_export_logs_with_filter(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Setup:
Insert logs with different severity levels.
Tests:
1. Export logs with filter applied
2. Verify only filtered logs are returned
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=10),
body="Info message",
severity_text="INFO",
resources={
"service.name": "test-service",
},
attributes={},
),
Logs(
timestamp=now - timedelta(seconds=8),
body="Error message",
severity_text="ERROR",
resources={
"service.name": "test-service",
},
attributes={},
),
Logs(
timestamp=now - timedelta(seconds=5),
body="Another error message",
severity_text="ERROR",
resources={
"service.name": "test-service",
},
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "logs",
"filter": "severity_text = 'ERROR'",
}
# Export logs with filter
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
# Parse JSONL content
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) == 2, f"Expected 2 lines (filtered), got {len(jsonl_lines)}"
# Verify only ERROR logs are returned
for line in jsonl_lines:
obj = json.loads(line)
assert obj["severity_text"] == "ERROR"
assert "error message" in obj["body"].lower()
def test_export_logs_with_limit(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Setup:
Insert 5 logs.
Tests:
1. Export logs with limit applied
2. Verify only limited number of logs are returned
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
logs = []
for i in range(5):
logs.append(
Logs(
timestamp=now - timedelta(seconds=i),
body=f"Log message {i}",
severity_text="INFO",
resources={
"service.name": "test-service",
},
attributes={
"index": i,
},
)
)
insert_logs(logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
params = {
"start": start_ns,
"end": end_ns,
"format": "csv",
"source": "logs",
"limit": 3,
}
# Export logs with limit
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "text/csv"
# Parse CSV content
csv_content = response.text
csv_reader = csv.DictReader(io.StringIO(csv_content))
rows = list(csv_reader)
assert len(rows) == 3, f"Expected 3 rows (limited), got {len(rows)}"
def test_export_logs_with_columns(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Setup:
Insert logs with various attributes.
Tests:
1. Export logs with specific columns
2. Verify only specified columns are returned
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=10),
body="Test log message",
severity_text="INFO",
resources={
"service.name": "test-service",
"deployment.environment": "production",
},
attributes={
"http.method": "GET",
"http.status_code": 200,
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Request only specific columns
params = {
"start": start_ns,
"end": end_ns,
"format": "csv",
"source": "logs",
"columns": ["timestamp", "severity_text", "body"],
}
# Export logs with specific columns
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params, doseq=True)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "text/csv"
# Parse CSV content
csv_content = response.text
csv_reader = csv.DictReader(io.StringIO(csv_content))
rows = list(csv_reader)
assert len(rows) == 1
# Verify the specified columns are present
row = rows[0]
assert "timestamp" in row
assert "severity_text" in row
assert "body" in row
assert row["severity_text"] == "INFO"
assert row["body"] == "Test log message"
def test_export_logs_with_order_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Setup:
Insert logs at different timestamps.
Tests:
1. Export logs with ascending timestamp order
2. Verify logs are returned in correct order
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=10),
body="First log",
severity_text="INFO",
resources={
"service.name": "test-service",
},
attributes={},
),
Logs(
timestamp=now - timedelta(seconds=5),
body="Second log",
severity_text="INFO",
resources={
"service.name": "test-service",
},
attributes={},
),
Logs(
timestamp=now - timedelta(seconds=1),
body="Third log",
severity_text="INFO",
resources={
"service.name": "test-service",
},
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "logs",
"order_by": "timestamp:asc",
}
# Export logs with ascending order
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
# Parse JSONL content
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) == 3
# Verify order - first log should be "First log" (oldest)
json_objects = [json.loads(line) for line in jsonl_lines]
assert json_objects[0]["body"] == "First log"
assert json_objects[1]["body"] == "Second log"
assert json_objects[2]["body"] == "Third log"
def test_export_logs_with_complex_filter(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Setup:
Insert logs with various service names and severity levels.
Tests:
1. Export logs with complex filter (multiple conditions)
2. Verify only logs matching all conditions are returned
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=10),
body="API error occurred",
severity_text="ERROR",
resources={
"service.name": "api-service",
},
attributes={},
),
Logs(
timestamp=now - timedelta(seconds=8),
body="Worker info message",
severity_text="INFO",
resources={
"service.name": "worker-service",
},
attributes={},
),
Logs(
timestamp=now - timedelta(seconds=5),
body="API info message",
severity_text="INFO",
resources={
"service.name": "api-service",
},
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Filter for api-service AND ERROR severity
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "logs",
"filter": "service.name = 'api-service' AND severity_text = 'ERROR'",
}
# Export logs with complex filter
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
# Parse JSONL content
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) == 1, f"Expected 1 line (complex filter), got {len(jsonl_lines)}"
# Verify the filtered log
filtered_obj = json.loads(jsonl_lines[0])
assert filtered_obj["body"] == "API error occurred"
assert filtered_obj["severity_text"] == "ERROR"

View File

@@ -0,0 +1,927 @@
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, List
from urllib.parse import urlencode
import csv
import io
import json
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
def test_export_traces_csv(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Setup:
Insert 3 traces with different attributes.
Tests:
1. Export traces as CSV format
2. Verify CSV structure and content
3. Validate headers are present
4. Check trace data is correctly formatted
"""
http_service_trace_id = TraceIdGenerator.trace_id()
http_service_span_id = TraceIdGenerator.span_id()
http_service_db_span_id = TraceIdGenerator.span_id()
topic_service_trace_id = TraceIdGenerator.trace_id()
topic_service_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=4),
duration=timedelta(seconds=3),
trace_id=http_service_trace_id,
span_id=http_service_span_id,
parent_span_id="",
name="POST /integration",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "http-service",
"os.type": "linux",
"host.name": "linux-000",
},
attributes={
"net.transport": "IP.TCP",
"http.scheme": "http",
"http.user_agent": "Integration Test",
"http.request.method": "POST",
"http.response.status_code": "200",
},
),
Traces(
timestamp=now - timedelta(seconds=3.5),
duration=timedelta(seconds=0.5),
trace_id=http_service_trace_id,
span_id=http_service_db_span_id,
parent_span_id=http_service_span_id,
name="SELECT",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "http-service",
"os.type": "linux",
"host.name": "linux-000",
},
attributes={
"db.name": "integration",
"db.operation": "SELECT",
"db.statement": "SELECT * FROM integration",
},
),
Traces(
timestamp=now - timedelta(seconds=1),
duration=timedelta(seconds=2),
trace_id=topic_service_trace_id,
span_id=topic_service_span_id,
parent_span_id="",
name="topic publish",
kind=TracesKind.SPAN_KIND_PRODUCER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "topic-service",
"os.type": "linux",
"host.name": "linux-001",
},
attributes={
"message.type": "SENT",
"messaging.operation": "publish",
"messaging.message.id": "001",
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Build composite query for traces export
composite_query = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "A",
"limit": 1000
}
}
params = {
"start": start_ns,
"end": end_ns,
"format": "csv",
"source": "traces",
"composite_query": json.dumps(composite_query)
}
# Export traces as CSV
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "text/csv"
assert "attachment" in response.headers.get("Content-Disposition", "")
# Parse CSV content
csv_content = response.text
csv_reader = csv.DictReader(io.StringIO(csv_content))
rows = list(csv_reader)
assert len(rows) == 3, f"Expected 3 rows, got {len(rows)}"
# Verify trace IDs are present in the exported data
trace_ids = [row.get("trace_id") for row in rows]
assert http_service_trace_id in trace_ids
assert topic_service_trace_id in trace_ids
# Verify span names are present
span_names = [row.get("name") for row in rows]
assert "POST /integration" in span_names
assert "SELECT" in span_names
assert "topic publish" in span_names
def test_export_traces_jsonl(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Setup:
Insert 2 traces with different attributes.
Tests:
1. Export traces as JSONL format
2. Verify JSONL structure and content
3. Check each line is valid JSON
4. Validate trace data is correctly formatted
"""
http_service_trace_id = TraceIdGenerator.trace_id()
http_service_span_id = TraceIdGenerator.span_id()
topic_service_trace_id = TraceIdGenerator.trace_id()
topic_service_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=4),
duration=timedelta(seconds=3),
trace_id=http_service_trace_id,
span_id=http_service_span_id,
parent_span_id="",
name="POST /api/test",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "api-service",
"deployment.environment": "staging",
},
attributes={
"http.request.method": "POST",
"http.response.status_code": "201",
},
),
Traces(
timestamp=now - timedelta(seconds=2),
duration=timedelta(seconds=1),
trace_id=topic_service_trace_id,
span_id=topic_service_span_id,
parent_span_id="",
name="queue.process",
kind=TracesKind.SPAN_KIND_CONSUMER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "queue-service",
"deployment.environment": "staging",
},
attributes={
"messaging.operation": "process",
"messaging.system": "rabbitmq",
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Build composite query for traces export
composite_query = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "A",
"limit": 1000
}
}
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "traces",
"composite_query": json.dumps(composite_query)
}
# Export traces as JSONL
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
assert "attachment" in response.headers.get("Content-Disposition", "")
# Parse JSONL content
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) == 2, f"Expected 2 lines, got {len(jsonl_lines)}"
# Verify each line is valid JSON
json_objects = []
for line in jsonl_lines:
obj = json.loads(line)
json_objects.append(obj)
assert "trace_id" in obj
assert "span_id" in obj
assert "name" in obj
# Verify trace IDs are present
trace_ids = [obj.get("trace_id") for obj in json_objects]
assert http_service_trace_id in trace_ids
assert topic_service_trace_id in trace_ids
# Verify span names are present
span_names = [obj.get("name") for obj in json_objects]
assert "POST /api/test" in span_names
assert "queue.process" in span_names
def test_export_traces_with_filter(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Setup:
Insert traces with different service names.
Tests:
1. Export traces with filter applied
2. Verify only filtered traces are returned
"""
service_a_trace_id = TraceIdGenerator.trace_id()
service_a_span_id = TraceIdGenerator.span_id()
service_b_trace_id = TraceIdGenerator.trace_id()
service_b_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=4),
duration=timedelta(seconds=1),
trace_id=service_a_trace_id,
span_id=service_a_span_id,
parent_span_id="",
name="operation-a",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "service-a",
},
attributes={},
),
Traces(
timestamp=now - timedelta(seconds=2),
duration=timedelta(seconds=1),
trace_id=service_b_trace_id,
span_id=service_b_span_id,
parent_span_id="",
name="operation-b",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "service-b",
},
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Build composite query with filter for service-a only
composite_query = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "A",
"limit": 1000,
"filter": {
"expression": "service.name = 'service-a'"
}
}
}
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "traces",
"composite_query": json.dumps(composite_query)
}
# Export traces with filter
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
# Parse JSONL content
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) == 1, f"Expected 1 line (filtered), got {len(jsonl_lines)}"
# Verify the filtered trace
filtered_obj = json.loads(jsonl_lines[0])
assert filtered_obj["trace_id"] == service_a_trace_id
assert filtered_obj["name"] == "operation-a"
def test_export_traces_with_limit(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Setup:
Insert 5 traces.
Tests:
1. Export traces with limit applied
2. Verify only limited number of traces are returned
"""
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
traces = []
for i in range(5):
traces.append(
Traces(
timestamp=now - timedelta(seconds=i),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name=f"operation-{i}",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "test-service",
},
attributes={},
)
)
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Build composite query with limit of 3
composite_query = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "A",
"limit": 3
}
}
params = {
"start": start_ns,
"end": end_ns,
"format": "csv",
"source": "traces",
"composite_query": json.dumps(composite_query)
}
# Export traces with limit
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "text/csv"
# Parse CSV content
csv_content = response.text
csv_reader = csv.DictReader(io.StringIO(csv_content))
rows = list(csv_reader)
assert len(rows) == 3, f"Expected 3 rows (limited), got {len(rows)}"
def test_export_traces_with_multiple_composite_queries(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Setup:
Insert traces with different service names and attributes.
Tests:
1. Export traces using multiple composite queries
2. Verify all queries are executed and results are combined
"""
service_a_trace_id = TraceIdGenerator.trace_id()
service_a_span_id = TraceIdGenerator.span_id()
service_b_trace_id = TraceIdGenerator.trace_id()
service_b_span_id = TraceIdGenerator.span_id()
service_c_trace_id = TraceIdGenerator.trace_id()
service_c_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=service_a_trace_id,
span_id=service_a_span_id,
parent_span_id="",
name="operation-a",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "service-a",
},
attributes={
"http.status_code": "200",
},
),
Traces(
timestamp=now - timedelta(seconds=8),
duration=timedelta(seconds=1),
trace_id=service_b_trace_id,
span_id=service_b_span_id,
parent_span_id="",
name="operation-b",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_ERROR,
status_message="",
resources={
"service.name": "service-b",
},
attributes={
"http.status_code": "500",
},
),
Traces(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=1),
trace_id=service_c_trace_id,
span_id=service_c_span_id,
parent_span_id="",
name="operation-c",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "service-c",
},
attributes={
"http.status_code": "200",
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Build first composite query - filter for service-a
composite_query_1 = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "A",
"limit": 1000,
"filter": {
"expression": "service.name = 'service-a'"
}
}
}
# Build second composite query - filter for service-b
composite_query_2 = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "B",
"limit": 1000,
"filter": {
"expression": "service.name = 'service-b'"
}
}
}
# Multiple queries are supported: each query is executed and all results are combined
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "traces",
}
# Add both composite queries as separate parameters
from urllib.parse import quote
url = signoz.self.host_configs["8080"].get("/api/v1/export_raw_data")
url += f"?{urlencode(params)}"
url += f"&composite_query={quote(json.dumps(composite_query_1))}"
url += f"&composite_query={quote(json.dumps(composite_query_2))}"
# Export traces with multiple composite queries
response = requests.get(
url,
timeout=30,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
# Parse JSONL content
# With multiple queries, we should get results from both queries
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) >= 1, f"Expected at least 1 line, got {len(jsonl_lines)}"
# Verify the result
json_objects = [json.loads(line) for line in jsonl_lines]
# Check that we got results
assert len(json_objects) > 0
# Verify we got traces from both service-a and service-b (or at least one of them)
# Multiple queries return combined results from all queries
trace_ids_in_results = [obj.get("trace_id") for obj in json_objects]
# We should have at least service-a or service-b (depending on query execution)
assert service_a_trace_id in trace_ids_in_results or service_b_trace_id in trace_ids_in_results
# Count how many from each service we got
service_a_count = trace_ids_in_results.count(service_a_trace_id)
service_b_count = trace_ids_in_results.count(service_b_trace_id)
# At least one query should have returned results
assert service_a_count > 0 or service_b_count > 0
def test_export_traces_with_composite_query_trace_operator(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Setup:
Insert multiple traces with parent-child relationships.
Tests:
1. Export traces using trace operator in composite query
2. Verify trace operator query works correctly
"""
parent_trace_id = TraceIdGenerator.trace_id()
parent_span_id = TraceIdGenerator.span_id()
child_span_id_1 = TraceIdGenerator.span_id()
child_span_id_2 = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=5),
trace_id=parent_trace_id,
span_id=parent_span_id,
parent_span_id="",
name="parent-operation",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "parent-service",
},
attributes={
"operation.type": "parent",
},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=2),
trace_id=parent_trace_id,
span_id=child_span_id_1,
parent_span_id=parent_span_id,
name="child-operation-1",
kind=TracesKind.SPAN_KIND_INTERNAL,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "parent-service",
},
attributes={
"operation.type": "child",
},
),
Traces(
timestamp=now - timedelta(seconds=7),
duration=timedelta(seconds=1),
trace_id=parent_trace_id,
span_id=child_span_id_2,
parent_span_id=parent_span_id,
name="child-operation-2",
kind=TracesKind.SPAN_KIND_INTERNAL,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "parent-service",
},
attributes={
"operation.type": "child",
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Build first composite query - filter for service-a
composite_query_1 = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "A",
"limit": 1000,
"filter": {
"expression": "operation.type = 'parent'"
}
}
}
# Build second composite query - filter for service-b
composite_query_2 = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "B",
"limit": 1000,
"filter": {
"expression": "operation.type = 'child'"
}
}
}
# Build composite query using trace operator
composite_query = {
"type": "builder_trace_operator",
"spec": {
"name": "C",
"expression": "A => B",
"limit": 1000
}
}
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "traces",
}
# Add both composite queries as separate parameters
from urllib.parse import quote
url = signoz.self.host_configs["8080"].get("/api/v1/export_raw_data")
url += f"?{urlencode(params)}"
url += f"&composite_query={quote(json.dumps(composite_query_1))}"
url += f"&composite_query={quote(json.dumps(composite_query_2))}"
url += f"&composite_query={quote(json.dumps(composite_query))}"
# Export traces using trace operator
response = requests.get(
url,
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
# Parse JSONL content
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) == 2, f"Expected 2 lines, got {len(jsonl_lines)}"
# Verify all traces are from the same trace
json_objects = [json.loads(line) for line in jsonl_lines]
trace_ids = [obj.get("trace_id") for obj in json_objects]
assert all(tid == parent_trace_id for tid in trace_ids)
# Verify span names
span_names = [obj.get("name") for obj in json_objects]
assert "parent-operation" in span_names
def test_export_traces_with_select_fields(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Setup:
Insert traces with various attributes.
Tests:
1. Export traces with specific select fields
2. Verify only specified fields are returned in the output
"""
trace_id = TraceIdGenerator.trace_id()
span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=2),
trace_id=trace_id,
span_id=span_id,
parent_span_id="",
name="test-operation",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"service.name": "test-service",
"deployment.environment": "production",
"host.name": "server-01",
},
attributes={
"http.method": "POST",
"http.status_code": "201",
"user.id": "user123",
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Calculate timestamps in nanoseconds
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
end_ns = int(now.timestamp() * 1e9)
# Build composite query with specific select fields
composite_query = {
"type": "builder_query",
"spec": {
"signal": "traces",
"name": "A",
"limit": 1000,
"selectFields": [
{
"name": "trace_id",
"fieldDataType": "string",
"fieldContext": "span",
"signal": "traces"
},
{
"name": "span_id",
"fieldDataType": "string",
"fieldContext": "span",
"signal": "traces"
},
{
"name": "name",
"fieldDataType": "string",
"fieldContext": "span",
"signal": "traces"
},
{
"name": "service.name",
"fieldDataType": "string",
"fieldContext": "resource",
"signal": "traces"
}
]
}
}
params = {
"start": start_ns,
"end": end_ns,
"format": "jsonl",
"source": "traces",
"composite_query": json.dumps(composite_query)
}
# Export traces with select fields
response = requests.get(
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
timeout=10,
headers={
"authorization": f"Bearer {token}",
},
)
assert response.status_code == HTTPStatus.OK
assert response.headers["Content-Type"] == "application/x-ndjson"
# Parse JSONL content
jsonl_lines = response.text.strip().split("\n")
assert len(jsonl_lines) == 1
# Verify the selected fields are present
result = json.loads(jsonl_lines[0])
assert "trace_id" in result
assert "span_id" in result
assert "name" in result
# Verify values
assert result["trace_id"] == trace_id
assert result["span_id"] == span_id
assert result["name"] == "test-operation"