mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-09 03:02:20 +00:00
Compare commits
6 Commits
test/uplot
...
tvats-expo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ca1f5537b | ||
|
|
a3a0761068 | ||
|
|
2de1ec3e68 | ||
|
|
fb4adbdb83 | ||
|
|
6f3bf60a71 | ||
|
|
b6a3eaa3ad |
@@ -607,6 +607,34 @@ paths:
|
||||
summary: Update auth domain
|
||||
tags:
|
||||
- authdomains
|
||||
/api/v1/export_raw_data:
|
||||
get:
|
||||
deprecated: false
|
||||
description: This endpoints allows exporting raw data for traces and logs
|
||||
operationId: HandleExportRawData
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
description: OK
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Bad Request
|
||||
"500":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Internal Server Error
|
||||
summary: Export raw data
|
||||
tags:
|
||||
- logs
|
||||
- traces
|
||||
/api/v1/getResetPasswordToken/{id}:
|
||||
get:
|
||||
deprecated: false
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
"github.com/SigNoz/signoz/pkg/modules/promote"
|
||||
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
|
||||
"github.com/SigNoz/signoz/pkg/modules/session"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
@@ -39,6 +40,7 @@ type provider struct {
|
||||
dashboardModule dashboard.Module
|
||||
dashboardHandler dashboard.Handler
|
||||
metricsExplorerHandler metricsexplorer.Handler
|
||||
rawDataExportHandler rawdataexport.Handler
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
@@ -55,9 +57,10 @@ func NewFactory(
|
||||
dashboardModule dashboard.Module,
|
||||
dashboardHandler dashboard.Handler,
|
||||
metricsExplorerHandler metricsexplorer.Handler,
|
||||
rawDataExportHandler rawdataexport.Handler,
|
||||
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
|
||||
return newProvider(ctx, providerSettings, config, orgGetter, authz, orgHandler, userHandler, sessionHandler, authDomainHandler, preferenceHandler, globalHandler, promoteHandler, flaggerHandler, dashboardModule, dashboardHandler, metricsExplorerHandler)
|
||||
return newProvider(ctx, providerSettings, config, orgGetter, authz, orgHandler, userHandler, sessionHandler, authDomainHandler, preferenceHandler, globalHandler, promoteHandler, flaggerHandler, dashboardModule, dashboardHandler, metricsExplorerHandler, rawDataExportHandler)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -78,6 +81,7 @@ func newProvider(
|
||||
dashboardModule dashboard.Module,
|
||||
dashboardHandler dashboard.Handler,
|
||||
metricsExplorerHandler metricsexplorer.Handler,
|
||||
rawDataExportHandler rawdataexport.Handler,
|
||||
) (apiserver.APIServer, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
@@ -97,6 +101,7 @@ func newProvider(
|
||||
dashboardModule: dashboardModule,
|
||||
dashboardHandler: dashboardHandler,
|
||||
metricsExplorerHandler: metricsExplorerHandler,
|
||||
rawDataExportHandler: rawDataExportHandler,
|
||||
}
|
||||
|
||||
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
|
||||
@@ -153,6 +158,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addRawDataExportRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
28
pkg/apiserver/signozapiserver/rawDataExport.go
Normal file
28
pkg/apiserver/signozapiserver/rawDataExport.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/http/handler"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addRawDataExportRoutes(router *mux.Router) error {
|
||||
if err := router.Handle("/api/v1/export_raw_data", handler.New(provider.authZ.ViewAccess(provider.rawDataExportHandler.ExportRawData), handler.OpenAPIDef{
|
||||
ID: "HandleExportRawData",
|
||||
Tags: []string{"logs", "traces"},
|
||||
Summary: "Export raw data",
|
||||
Description: "This endpoints allows exporting raw data for traces and logs",
|
||||
Request: new(types.ExportRawDataQueryParams),
|
||||
RequestContentType: "application/json",
|
||||
Response: nil,
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||
})).Methods(http.MethodGet).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -60,6 +61,13 @@ func NewHandler(module rawdataexport.Module) rawdataexport.Handler {
|
||||
// Direction: "asc" or "desc"
|
||||
// Default: ["timestamp:desc", "id:desc"]
|
||||
//
|
||||
// - composite_query (optional): Advanced query specification as JSON-encoded QueryEnvelope array
|
||||
// When provided, this overrides filter, columns, order_by, and limit parameters
|
||||
// Format: JSON array of QueryEnvelope objects
|
||||
// Each QueryEnvelope must have a valid "type" field (e.g., "builder_query", "builder_trace_operator")
|
||||
// Supported types for traces: "builder_query", "builder_trace_operator"
|
||||
// Note: Limits in composite queries are validated and cannot exceed MAX_EXPORT_ROW_COUNT_LIMIT
|
||||
//
|
||||
// Response Headers:
|
||||
// - Content-Type: "text/csv" or "application/x-ndjson"
|
||||
// - Content-Encoding: "gzip" (handled by HTTP middleware)
|
||||
@@ -86,6 +94,11 @@ func NewHandler(module rawdataexport.Module) rawdataexport.Handler {
|
||||
// Export with filter and ordering:
|
||||
// GET /api/v1/export_raw_data?start=1693612800000000000&end=1693699199000000000
|
||||
// &filter=severity="error"&order_by=timestamp:desc&limit=1000
|
||||
//
|
||||
// Export with composite query (advanced):
|
||||
// GET /api/v1/export_raw_data?source=traces&start=1693612800000000000&end=1693699199000000000
|
||||
// &composite_query={"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000}}
|
||||
// &composite_query={"type":"builder_trace_operator","spec":{"name":"B","expression":"join","limit":500}}
|
||||
func (handler *handler) ExportRawData(rw http.ResponseWriter, r *http.Request) {
|
||||
source, err := getExportQuerySource(r.URL.Query())
|
||||
if err != nil {
|
||||
@@ -105,12 +118,169 @@ func (handler *handler) ExportRawData(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *handler) exportMetrics(rw http.ResponseWriter, r *http.Request) {
|
||||
func (handler *handler) exportMetrics(rw http.ResponseWriter, _ *http.Request) {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "metrics export is not yet supported"))
|
||||
}
|
||||
|
||||
func (handler *handler) exportTraces(rw http.ResponseWriter, r *http.Request) {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "traces export is not yet supported"))
|
||||
// Set up response headers
|
||||
rw.Header().Set("Cache-Control", "no-cache")
|
||||
rw.Header().Set("Vary", "Accept-Encoding") // Indicate that response varies based on Accept-Encoding
|
||||
rw.Header().Set("Access-Control-Expose-Headers", "Content-Disposition, X-Response-Complete")
|
||||
rw.Header().Set("Trailer", "X-Response-Complete")
|
||||
rw.Header().Set("Transfer-Encoding", "chunked")
|
||||
|
||||
queryParams := r.URL.Query()
|
||||
|
||||
startTime, endTime, err := getExportQueryTimeRange(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
format, err := getExportQueryFormat(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Set appropriate content type and filename
|
||||
filename := fmt.Sprintf("data_exported_%s.%s", time.Now().Format("2006-01-02_150405"), format)
|
||||
rw.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filename))
|
||||
|
||||
queryRangeRequest := qbtypes.QueryRangeRequest{
|
||||
Start: startTime,
|
||||
End: endTime,
|
||||
RequestType: qbtypes.RequestTypeRaw,
|
||||
CompositeQuery: qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{},
|
||||
},
|
||||
}
|
||||
|
||||
compositeQueries, err := getCompositeQueriesFromQueryParams(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query is malformed: %v", err))
|
||||
return
|
||||
}
|
||||
if len(compositeQueries) == 0 {
|
||||
// If no composite queries specified, add a default one
|
||||
|
||||
limit, err := getExportQueryLimit(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
filterExpression := queryParams.Get("filter")
|
||||
|
||||
orderByExpression, err := getExportQueryOrderByTraces(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
columns := getExportQueryColumns(queryParams)
|
||||
|
||||
spec := qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
SelectFields: columns,
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: filterExpression,
|
||||
},
|
||||
Limit: limit,
|
||||
Order: orderByExpression,
|
||||
}
|
||||
|
||||
compositeQueries = append(compositeQueries, qbtypes.QueryEnvelope{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: spec,
|
||||
})
|
||||
}
|
||||
for i := range compositeQueries {
|
||||
// Ensure each query envelope has the correct type
|
||||
if compositeQueries[i].Type != qbtypes.QueryTypeBuilder && compositeQueries[i].Type != qbtypes.QueryTypeTraceOperator {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d has unsupported type: %s", i, compositeQueries[i].Type.StringValue()))
|
||||
return
|
||||
}
|
||||
|
||||
limit := DefaultExportRowCountLimit
|
||||
|
||||
switch compositeQueries[i].Type {
|
||||
case qbtypes.QueryTypeBuilder, qbtypes.QueryTypeTraceOperator:
|
||||
switch spec := compositeQueries[i].Spec.(type) {
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
||||
if spec.Limit == 0 {
|
||||
spec.Limit = limit
|
||||
} else if spec.Limit > MaxExportRowCountLimit {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d limit cannot be more than %d", i, MaxExportRowCountLimit))
|
||||
return
|
||||
}
|
||||
compositeQueries[i].Spec = spec
|
||||
case qbtypes.QueryBuilderTraceOperator:
|
||||
if spec.Limit == 0 {
|
||||
spec.Limit = limit
|
||||
} else if spec.Limit > MaxExportRowCountLimit {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d limit cannot be more than %d", i, MaxExportRowCountLimit))
|
||||
return
|
||||
}
|
||||
compositeQueries[i].Spec = spec
|
||||
|
||||
default:
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d has invalid spec type for builder query", i))
|
||||
return
|
||||
}
|
||||
|
||||
default:
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "composite query %d has unsupported type: %s", i, compositeQueries[i].Type.StringValue()))
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
queryRangeRequest.CompositeQuery.Queries = compositeQueries
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID, err := valuer.NewUUID(claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "orgID is invalid"))
|
||||
return
|
||||
}
|
||||
|
||||
// This will signal Export module to stop sending data
|
||||
doneChan := make(chan any)
|
||||
defer close(doneChan)
|
||||
rowChan, errChan := handler.module.ExportRawData(r.Context(), orgID, &queryRangeRequest, doneChan)
|
||||
|
||||
var isComplete bool
|
||||
|
||||
switch format {
|
||||
case "csv", "":
|
||||
rw.Header().Set("Content-Type", "text/csv")
|
||||
csvWriter := csv.NewWriter(rw)
|
||||
isComplete, err = handler.exportRawDataCSV(rowChan, errChan, csvWriter)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
csvWriter.Flush()
|
||||
case "jsonl":
|
||||
rw.Header().Set("Content-Type", "application/x-ndjson")
|
||||
isComplete, err = handler.exportRawDataJSONL(rowChan, errChan, rw)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
default:
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid format: must be csv or jsonl"))
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Set("X-Response-Complete", strconv.FormatBool(isComplete))
|
||||
}
|
||||
|
||||
func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
|
||||
@@ -147,7 +317,7 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
filterExpression := queryParams.Get("filter")
|
||||
|
||||
orderByExpression, err := getExportQueryOrderBy(queryParams)
|
||||
orderByExpression, err := getExportQueryOrderByLogs(queryParams)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
@@ -206,7 +376,7 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
|
||||
case "csv", "":
|
||||
rw.Header().Set("Content-Type", "text/csv")
|
||||
csvWriter := csv.NewWriter(rw)
|
||||
isComplete, err = handler.exportLogsCSV(rowChan, errChan, csvWriter)
|
||||
isComplete, err = handler.exportRawDataCSV(rowChan, errChan, csvWriter)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
@@ -214,7 +384,7 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
|
||||
csvWriter.Flush()
|
||||
case "jsonl":
|
||||
rw.Header().Set("Content-Type", "application/x-ndjson")
|
||||
isComplete, err = handler.exportLogsJSONL(rowChan, errChan, rw)
|
||||
isComplete, err = handler.exportRawDataJSONL(rowChan, errChan, rw)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
@@ -227,7 +397,8 @@ func (handler *handler) exportLogs(rw http.ResponseWriter, r *http.Request) {
|
||||
rw.Header().Set("X-Response-Complete", strconv.FormatBool(isComplete))
|
||||
}
|
||||
|
||||
func (handler *handler) exportLogsCSV(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, csvWriter *csv.Writer) (bool, error) {
|
||||
// exportRawDataCSV is a generic CSV export function that works with any raw data (logs, traces, etc.)
|
||||
func (handler *handler) exportRawDataCSV(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, csvWriter *csv.Writer) (bool, error) {
|
||||
var header []string
|
||||
|
||||
headerToIndexMapping := make(map[string]int, len(header))
|
||||
@@ -268,8 +439,8 @@ func (handler *handler) exportLogsCSV(rowChan <-chan *qbtypes.RawRow, errChan <-
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *handler) exportLogsJSONL(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, writer io.Writer) (bool, error) {
|
||||
|
||||
// exportRawDataJSONL is a generic JSONL export function that works with any raw data (logs, traces, etc.)
|
||||
func (handler *handler) exportRawDataJSONL(rowChan <-chan *qbtypes.RawRow, errChan <-chan error, writer io.Writer) (bool, error) {
|
||||
totalBytes := uint64(0)
|
||||
for {
|
||||
select {
|
||||
@@ -306,7 +477,7 @@ func getExportQuerySource(queryParams url.Values) (string, error) {
|
||||
case "metrics":
|
||||
return "metrics", errors.NewInvalidInputf(errors.CodeInvalidInput, "metrics export not yet supported")
|
||||
case "traces":
|
||||
return "traces", errors.NewInvalidInputf(errors.CodeInvalidInput, "traces export not yet supported")
|
||||
return "traces", nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid source: must be logs, metrics or traces")
|
||||
}
|
||||
@@ -474,7 +645,7 @@ func getExportQueryOrderBy(queryParams url.Values) ([]qbtypes.OrderBy, error) {
|
||||
|
||||
orderByParam = strings.TrimSpace(orderByParam)
|
||||
if orderByParam == "" {
|
||||
return telemetrylogs.DefaultLogsV2SortingOrder, nil
|
||||
return []qbtypes.OrderBy{}, nil
|
||||
}
|
||||
|
||||
parts := strings.Split(orderByParam, ":")
|
||||
@@ -492,25 +663,78 @@ func getExportQueryOrderBy(queryParams url.Values) ([]qbtypes.OrderBy, error) {
|
||||
|
||||
orderByKey := telemetrytypes.GetFieldKeyFromKeyText(column)
|
||||
|
||||
orderBy := []qbtypes.OrderBy{
|
||||
return []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: orderByKey,
|
||||
},
|
||||
Direction: orderDirection,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getExportQueryOrderByLogs(queryParams url.Values) ([]qbtypes.OrderBy, error) {
|
||||
orderBy, err := getExportQueryOrderBy(queryParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(orderBy) == 0 {
|
||||
// Default sorting for logs: timestamp desc, id desc
|
||||
return telemetrylogs.DefaultLogsV2SortingOrder, nil
|
||||
}
|
||||
// If we are ordering by the timestamp column, also order by the ID column
|
||||
if orderByKey.Name == telemetrylogs.LogsV2TimestampColumn {
|
||||
if orderBy[0].Key.Name == telemetrylogs.DefaultLogsV2SortingOrder[0].Key.Name {
|
||||
orderBy = append(orderBy, qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: telemetrylogs.LogsV2IDColumn,
|
||||
},
|
||||
TelemetryFieldKey: telemetrylogs.DefaultLogsV2SortingOrder[1].Key.TelemetryFieldKey,
|
||||
},
|
||||
Direction: orderDirection,
|
||||
Direction: orderBy[0].Direction,
|
||||
})
|
||||
}
|
||||
return orderBy, nil
|
||||
}
|
||||
|
||||
// getExportQueryOrderByTraces parses the "order_by" query parameters for traces and returns a slice of OrderBy structs.
|
||||
// Each "order_by" parameter should be in the format "column:direction"
|
||||
// Each "column" should be a valid telemetry field key in the format "context.field:type" or "context.field" or "field"
|
||||
func getExportQueryOrderByTraces(queryParams url.Values) ([]qbtypes.OrderBy, error) {
|
||||
orderBy, err := getExportQueryOrderBy(queryParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(orderBy) == 0 {
|
||||
// Default sorting for logs: timestamp desc, span_id desc
|
||||
return telemetrytraces.DefaultTracesSortingOrder, nil
|
||||
}
|
||||
|
||||
// If we are ordering by the timestamp column, also order by the span ID column
|
||||
if orderBy[0].Key.Name == telemetrytraces.DefaultTracesSortingOrder[0].Key.Name {
|
||||
orderBy = append(orderBy, qbtypes.OrderBy{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytraces.DefaultTracesSortingOrder[1].Key.TelemetryFieldKey,
|
||||
},
|
||||
Direction: orderBy[0].Direction,
|
||||
})
|
||||
}
|
||||
return orderBy, nil
|
||||
}
|
||||
|
||||
func getCompositeQueriesFromQueryParams(queryParams url.Values) ([]qbtypes.QueryEnvelope, error) {
|
||||
// Check if composite_query parameter exists in query params
|
||||
compositeQueryParams := queryParams["composite_query"]
|
||||
|
||||
// Try to unmarshal the composite_query JSON parameter
|
||||
var queries []qbtypes.QueryEnvelope
|
||||
for _, compositeQueryParam := range compositeQueryParams {
|
||||
var query qbtypes.QueryEnvelope
|
||||
if err := json.Unmarshal([]byte(compositeQueryParam), &query); err != nil {
|
||||
// If unmarshaling fails, return empty slice (will fall back to default query)
|
||||
return nil, err
|
||||
}
|
||||
queries = append(queries, query)
|
||||
}
|
||||
|
||||
return queries, nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package implrawdataexport
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
@@ -37,10 +38,10 @@ func TestGetExportQuerySource(t *testing.T) {
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "traces source - not supported",
|
||||
name: "traces source - supported",
|
||||
queryParams: url.Values{"source": {"traces"}},
|
||||
expectedSource: "traces",
|
||||
expectedError: true,
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "invalid source",
|
||||
@@ -318,7 +319,7 @@ func TestGetExportQueryColumns(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetExportQueryOrderBy(t *testing.T) {
|
||||
func TestGetExportQueryOrderByLogs(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
@@ -505,7 +506,7 @@ func TestGetExportQueryOrderBy(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
order, err := getExportQueryOrderBy(tt.queryParams)
|
||||
order, err := getExportQueryOrderByLogs(tt.queryParams)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
@@ -561,3 +562,468 @@ func TestConstructCSVRecordFromQueryResponse(t *testing.T) {
|
||||
assert.Equal(t, "INFO", record[2])
|
||||
assert.Equal(t, "test-id", record[3])
|
||||
}
|
||||
|
||||
func TestGetExportQueryOrderByTraces(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
expectedOrder []qbtypes.OrderBy
|
||||
expectedError bool
|
||||
}{
|
||||
{
|
||||
name: "no order specified - should use default (timestamp:desc, span_id:desc)",
|
||||
queryParams: url.Values{},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "span_id",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "order by timestamp asc - should also add span_id asc",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp:asc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "span_id",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "order by timestamp desc - should also add span_id desc",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp:desc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "span_id",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "order by non-timestamp field - should not add span_id",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"duration:desc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "duration",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "order by attribute with type",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"attribute.http.status_code:number:asc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "order by resource with type",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"resource.service.name:string:desc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "order by attribute without type (dot notation)",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"attribute.http.method:asc"},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "invalid format - missing direction",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp"},
|
||||
},
|
||||
expectedOrder: nil,
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "invalid direction",
|
||||
queryParams: url.Values{
|
||||
"order_by": {"timestamp:invalid"},
|
||||
},
|
||||
expectedOrder: nil,
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "empty order_by value - should use default",
|
||||
queryParams: url.Values{
|
||||
"order_by": {""},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "span_id",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
{
|
||||
name: "whitespace only order_by value - should use default",
|
||||
queryParams: url.Values{
|
||||
"order_by": {" "},
|
||||
},
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "span_id",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
order, err := getExportQueryOrderByTraces(tt.queryParams)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(tt.expectedOrder), len(order))
|
||||
for i, expectedOrd := range tt.expectedOrder {
|
||||
assert.Equal(t, expectedOrd, order[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetCompositeQueriesFromQueryParams(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
queryParams url.Values
|
||||
expectedQueries []qbtypes.QueryEnvelope
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "no composite_query parameter - should return empty slice",
|
||||
queryParams: url.Values{},
|
||||
expectedQueries: nil,
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "single valid builder query",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000}}`},
|
||||
},
|
||||
expectedQueries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Name: "A",
|
||||
Limit: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "single valid trace_operator query",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {`{"type":"builder_trace_operator","spec":{"name":"B","expression":"join","limit":500}}`},
|
||||
},
|
||||
expectedQueries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeTraceOperator,
|
||||
Spec: qbtypes.QueryBuilderTraceOperator{
|
||||
Name: "B",
|
||||
Expression: "join",
|
||||
Limit: 500,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "multiple composite queries",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {
|
||||
`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000}}`,
|
||||
`{"type":"builder_trace_operator","spec":{"name":"B","expression":"join","limit":500}}`,
|
||||
},
|
||||
},
|
||||
expectedQueries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Name: "A",
|
||||
Limit: 1000,
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeTraceOperator,
|
||||
Spec: qbtypes.QueryBuilderTraceOperator{
|
||||
Name: "B",
|
||||
Expression: "join",
|
||||
Limit: 500,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "invalid JSON - should return error",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {`{invalid json`},
|
||||
},
|
||||
expectedQueries: nil,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "empty composite_query value - should return error",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {""},
|
||||
},
|
||||
expectedQueries: nil,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "valid and invalid queries mixed - should return error on first invalid",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {
|
||||
`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000}}`,
|
||||
`{invalid}`,
|
||||
},
|
||||
},
|
||||
expectedQueries: nil,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "query without type field - should return error",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {`{"spec":{"signal":"traces","name":"A","limit":1000}}`},
|
||||
},
|
||||
expectedQueries: nil,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "query with complex spec including filter",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000,"filter":{"expression":"status=error"}}}`},
|
||||
},
|
||||
expectedQueries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Name: "A",
|
||||
Limit: 1000,
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "status=error",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "query with order by clause",
|
||||
queryParams: url.Values{
|
||||
"composite_query": {`{"type":"builder_query","spec":{"signal":"traces","name":"A","limit":1000,"order":[{"key":{"name":"timestamp"},"direction":"desc"},{"key":{"name":"span_id"},"direction":"desc"}]}}`},
|
||||
},
|
||||
expectedQueries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Name: "A",
|
||||
Limit: 1000,
|
||||
Order: []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "span_id",
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
queries, err := getCompositeQueriesFromQueryParams(tt.queryParams)
|
||||
|
||||
if tt.expectError {
|
||||
assert.Error(t, err, "expected an error")
|
||||
assert.Nil(t, queries, "queries should be nil when error occurs")
|
||||
} else {
|
||||
assert.NoError(t, err, "expected no error")
|
||||
|
||||
if tt.expectedQueries == nil {
|
||||
assert.Nil(t, queries, "expected nil queries")
|
||||
} else {
|
||||
assert.NotNil(t, queries, "expected non-nil queries")
|
||||
assert.Equal(t, len(tt.expectedQueries), len(queries), "number of queries mismatch")
|
||||
|
||||
if len(queries) == len(tt.expectedQueries) {
|
||||
for idx := range len(tt.expectedQueries) {
|
||||
assert.Equal(t, tt.expectedQueries[idx].Type, queries[idx].Type, "query type mismatch at index "+strconv.Itoa(idx))
|
||||
|
||||
// Validate spec type matches if expected spec is not nil
|
||||
if tt.expectedQueries[idx].Spec != nil {
|
||||
assert.NotNil(t, queries[idx].Spec, "spec should not be nil at index "+strconv.Itoa(idx))
|
||||
|
||||
// Check the type of spec matches expected
|
||||
expectedSpecType := fmt.Sprintf("%T", tt.expectedQueries[idx].Spec)
|
||||
actualSpecType := fmt.Sprintf("%T", queries[idx].Spec)
|
||||
assert.Equal(t, expectedSpecType, actualSpecType, "spec type mismatch at index "+strconv.Itoa(idx))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,21 @@ func NewModule(querier querier.Querier) rawdataexport.Module {
|
||||
|
||||
func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, doneChan chan any) (chan *qbtypes.RawRow, chan error) {
|
||||
|
||||
spec := rangeRequest.CompositeQuery.Queries[0].Spec.(qbtypes.QueryBuilderQuery[qbtypes.LogAggregation])
|
||||
rowCountLimit := spec.Limit
|
||||
isTraceOperatorQueryPresent := false
|
||||
for _, query := range rangeRequest.CompositeQuery.Queries {
|
||||
if _, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
|
||||
isTraceOperatorQueryPresent = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if isTraceOperatorQueryPresent {
|
||||
return exportRawDataForTraceOperatorQuery(m.querier, ctx, orgID, rangeRequest, doneChan)
|
||||
}
|
||||
return exportRawDataForQueryBuilderQueries(m.querier, ctx, orgID, rangeRequest, doneChan)
|
||||
}
|
||||
|
||||
func exportRawDataForQueryBuilderQueries(querier querier.Querier, ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, doneChan chan any) (chan *qbtypes.RawRow, chan error) {
|
||||
|
||||
rowChan := make(chan *qbtypes.RawRow, 1)
|
||||
errChan := make(chan error, 1)
|
||||
@@ -38,15 +51,124 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
|
||||
defer close(errChan)
|
||||
defer close(rowChan)
|
||||
|
||||
compositeQueries := rangeRequest.CompositeQuery.Queries
|
||||
|
||||
appendQueryName := len(compositeQueries) > 1
|
||||
|
||||
for _, query := range compositeQueries {
|
||||
|
||||
dupRangeRequest := rangeRequest.Copy()
|
||||
dupRangeRequest.CompositeQuery.Queries = []qbtypes.QueryEnvelope{query}
|
||||
switch query.Spec.(type) {
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
exportRawDataForQueryBuilderQuery[qbtypes.LogAggregation](querier, contextWithTimeout, orgID, &dupRangeRequest, rowChan, errChan, doneChan, appendQueryName)
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
exportRawDataForQueryBuilderQuery[qbtypes.MetricAggregation](querier, contextWithTimeout, orgID, &dupRangeRequest, rowChan, errChan, doneChan, appendQueryName)
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
||||
exportRawDataForQueryBuilderQuery[qbtypes.TraceAggregation](querier, contextWithTimeout, orgID, &dupRangeRequest, rowChan, errChan, doneChan, appendQueryName)
|
||||
default:
|
||||
errChan <- errors.NewInternalf(errors.CodeInternal, "unsupported query spec type: %T", query.Spec)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return rowChan, errChan
|
||||
|
||||
}
|
||||
|
||||
func exportRawDataForQueryBuilderQuery[T any](querier querier.Querier, ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, rowChan chan *qbtypes.RawRow, errChan chan error, doneChan chan any, appendQueryName bool) {
|
||||
spec, ok := rangeRequest.CompositeQuery.Queries[0].Spec.(qbtypes.QueryBuilderQuery[T])
|
||||
if !ok {
|
||||
errChan <- errors.NewInternalf(errors.CodeInternal, "invalid spec type for query builder query")
|
||||
return
|
||||
}
|
||||
|
||||
rowCountLimit := spec.Limit
|
||||
rowCount := 0
|
||||
|
||||
for rowCount < rowCountLimit {
|
||||
spec.Limit = min(ChunkSize, rowCountLimit-rowCount)
|
||||
spec.Offset = rowCount
|
||||
rangeRequest.CompositeQuery.Queries[0].Spec = spec
|
||||
|
||||
response, err := querier.QueryRange(ctx, orgID, rangeRequest)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
newRowsCount := 0
|
||||
for _, result := range response.Data.Results {
|
||||
resultData, ok := result.(*qbtypes.RawData)
|
||||
if !ok {
|
||||
errChan <- errors.NewInternalf(errors.CodeInternal, "expected RawData, got %T", result)
|
||||
return
|
||||
}
|
||||
|
||||
newRowsCount += len(resultData.Rows)
|
||||
for _, row := range resultData.Rows {
|
||||
if appendQueryName {
|
||||
row.Data["__query_name"] = spec.Name
|
||||
}
|
||||
select {
|
||||
case rowChan <- row:
|
||||
case <-doneChan:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
errChan <- ctx.Err()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Break if we did not receive any new rows
|
||||
if newRowsCount == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
rowCount += newRowsCount
|
||||
}
|
||||
}
|
||||
|
||||
func exportRawDataForTraceOperatorQuery(querier querier.Querier, ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, doneChan chan any) (chan *qbtypes.RawRow, chan error) {
|
||||
|
||||
traceOperatorIndex := 0
|
||||
|
||||
for i, query := range rangeRequest.CompositeQuery.Queries {
|
||||
if _, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
|
||||
traceOperatorIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
rowChan := make(chan *qbtypes.RawRow, 1)
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
// Set clickhouse max threads
|
||||
ctx := ctxtypes.SetClickhouseMaxThreads(ctx, ClickhouseExportRawDataMaxThreads)
|
||||
// Set clickhouse timeout
|
||||
contextWithTimeout, cancel := context.WithTimeout(ctx, ClickhouseExportRawDataTimeout)
|
||||
defer cancel()
|
||||
defer close(errChan)
|
||||
defer close(rowChan)
|
||||
|
||||
spec, ok := rangeRequest.CompositeQuery.Queries[traceOperatorIndex].Spec.(qbtypes.QueryBuilderTraceOperator)
|
||||
if !ok {
|
||||
errChan <- errors.NewInternalf(errors.CodeInternal, "invalid spec type for query builder trace operator")
|
||||
return
|
||||
}
|
||||
|
||||
rowCountLimit := spec.Limit
|
||||
rowCount := 0
|
||||
|
||||
for rowCount < rowCountLimit {
|
||||
spec.Limit = min(ChunkSize, rowCountLimit-rowCount)
|
||||
spec.Offset = rowCount
|
||||
rangeRequest.CompositeQuery.Queries[traceOperatorIndex].Spec = spec
|
||||
|
||||
rangeRequest.CompositeQuery.Queries[0].Spec = spec
|
||||
|
||||
response, err := m.querier.QueryRange(contextWithTimeout, orgID, rangeRequest)
|
||||
response, err := querier.QueryRange(contextWithTimeout, orgID, rangeRequest)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
@@ -71,7 +193,6 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Break if we did not receive any new rows
|
||||
@@ -80,7 +201,6 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
|
||||
}
|
||||
|
||||
rowCount += newRowsCount
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -593,7 +593,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
|
||||
})).Methods(http.MethodGet)
|
||||
|
||||
// Export
|
||||
router.HandleFunc("/api/v1/export_raw_data", am.ViewAccess(aH.Signoz.Handlers.RawDataExport.ExportRawData)).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/export_raw_data", am.ViewAccess(aH.Signoz.Handlers.RawDataExport.ExportRawData)).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/api/v1/span_percentile", am.ViewAccess(aH.Signoz.Handlers.SpanPercentile.GetSpanPercentileDetails)).Methods(http.MethodPost)
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
"github.com/SigNoz/signoz/pkg/modules/promote"
|
||||
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
|
||||
"github.com/SigNoz/signoz/pkg/modules/session"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
@@ -47,6 +48,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
|
||||
struct{ dashboard.Module }{},
|
||||
struct{ dashboard.Handler }{},
|
||||
struct{ metricsexplorer.Handler }{},
|
||||
struct{ rawdataexport.Handler }{},
|
||||
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -247,6 +247,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
|
||||
modules.Dashboard,
|
||||
handlers.Dashboard,
|
||||
handlers.MetricsExplorer,
|
||||
handlers.RawDataExport,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package telemetrytraces
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
import (
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
var (
|
||||
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
|
||||
@@ -373,4 +376,19 @@ var (
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
}
|
||||
|
||||
DefaultTracesSortingOrder = []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: DefaultFields["timestamp"],
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: DefaultFields["span_id"],
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -2,6 +2,7 @@ package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/govaluate"
|
||||
@@ -450,3 +451,74 @@ func (r *QueryRangeRequest) GetQueriesSupportingZeroDefault() map[string]bool {
|
||||
|
||||
return canDefaultZero
|
||||
}
|
||||
|
||||
func (r *QueryRangeRequest) Copy() QueryRangeRequest {
|
||||
c := QueryRangeRequest{
|
||||
SchemaVersion: r.SchemaVersion,
|
||||
Start: r.Start,
|
||||
End: r.End,
|
||||
RequestType: r.RequestType,
|
||||
NoCache: r.NoCache,
|
||||
FormatOptions: r.FormatOptions,
|
||||
Variables: make(map[string]VariableItem),
|
||||
CompositeQuery: CompositeQuery{},
|
||||
}
|
||||
|
||||
for k, v := range r.Variables {
|
||||
c.Variables[k] = v
|
||||
}
|
||||
|
||||
c.CompositeQuery.Queries = make([]QueryEnvelope, len(r.CompositeQuery.Queries))
|
||||
for i, q := range r.CompositeQuery.Queries {
|
||||
if q.Spec != nil {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
c.CompositeQuery.Queries[i] = QueryEnvelope{
|
||||
Type: q.Type,
|
||||
Spec: spec.Copy(),
|
||||
}
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
c.CompositeQuery.Queries[i] = QueryEnvelope{
|
||||
Type: q.Type,
|
||||
Spec: spec.Copy(),
|
||||
}
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
c.CompositeQuery.Queries[i] = QueryEnvelope{
|
||||
Type: q.Type,
|
||||
Spec: spec.Copy(),
|
||||
}
|
||||
case QueryBuilderFormula:
|
||||
c.CompositeQuery.Queries[i] = QueryEnvelope{
|
||||
Type: q.Type,
|
||||
Spec: spec.Copy(),
|
||||
}
|
||||
case QueryBuilderJoin:
|
||||
c.CompositeQuery.Queries[i] = QueryEnvelope{
|
||||
Type: q.Type,
|
||||
Spec: spec.Copy(),
|
||||
}
|
||||
case QueryBuilderTraceOperator:
|
||||
c.CompositeQuery.Queries[i] = QueryEnvelope{
|
||||
Type: q.Type,
|
||||
Spec: spec.Copy(),
|
||||
}
|
||||
default:
|
||||
// Check if spec implements a Copy method
|
||||
specValue := reflect.ValueOf(q.Spec)
|
||||
copyMethod := specValue.MethodByName("Copy")
|
||||
if copyMethod.IsValid() && copyMethod.Type().NumIn() == 0 && copyMethod.Type().NumOut() == 1 {
|
||||
// Call Copy method and use the result
|
||||
result := copyMethod.Call(nil)
|
||||
c.CompositeQuery.Queries[i] = QueryEnvelope{
|
||||
Type: q.Type,
|
||||
Spec: result[0].Interface(),
|
||||
}
|
||||
} else {
|
||||
// Fallback to shallow copy
|
||||
c.CompositeQuery.Queries[i] = q
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
39
pkg/types/rawdataexport.go
Normal file
39
pkg/types/rawdataexport.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
// ExportRawDataQueryParams represents the query parameters for the export raw data endpoint
|
||||
type ExportRawDataQueryParams struct {
|
||||
// Source specifies the type of data to export: "logs", "metrics", or "traces"
|
||||
Source string `json:"source" schema:"source"`
|
||||
|
||||
// Format specifies the output format: "csv" or "jsonl"
|
||||
Format string `json:"format" schema:"format"`
|
||||
|
||||
// Start is the start time for the query (Unix timestamp in nanoseconds)
|
||||
Start uint64 `json:"start" schema:"start"`
|
||||
|
||||
// End is the end time for the query (Unix timestamp in nanoseconds)
|
||||
End uint64 `json:"end" schema:"end"`
|
||||
|
||||
// Limit specifies the maximum number of rows to export
|
||||
Limit int `json:"limit" schema:"limit"`
|
||||
|
||||
// Filter is a filter expression to apply to the query
|
||||
Filter string `json:"filter" schema:"filter"`
|
||||
|
||||
// Columns specifies the columns to include in the export
|
||||
// Format: ["context.field:type", "context.field", "field"]
|
||||
Columns []string `json:"columns" schema:"columns"`
|
||||
|
||||
// OrderBy specifies the sorting order
|
||||
// Format: "column:direction" or "context.field:type:direction"
|
||||
// Direction can be "asc" or "desc"
|
||||
OrderBy string `json:"order_by" schema:"order_by"`
|
||||
|
||||
// CompositeQuery is an advanced query specification as JSON-encoded QueryEnvelope array
|
||||
// When provided, this overrides filter, columns, order_by, and limit parameters
|
||||
CompositeQuery []qbtypes.QueryEnvelope `json:"composite_query" schema:"composite_query"`
|
||||
}
|
||||
620
tests/integration/src/rawexportdata/01_logs.py
Normal file
620
tests/integration/src/rawexportdata/01_logs.py
Normal file
@@ -0,0 +1,620 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
from urllib.parse import urlencode
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.logs import Logs
|
||||
|
||||
|
||||
def test_export_logs_csv(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 3 logs with different severity levels and attributes.
|
||||
|
||||
Tests:
|
||||
1. Export logs as CSV format
|
||||
2. Verify CSV structure and content
|
||||
3. Validate headers are present
|
||||
4. Check log data is correctly formatted
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="Application started successfully",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
"deployment.environment": "production",
|
||||
"host.name": "server-01",
|
||||
},
|
||||
attributes={
|
||||
"http.method": "GET",
|
||||
"http.status_code": 200,
|
||||
"user.id": "user123",
|
||||
},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
body="Connection to database failed",
|
||||
severity_text="ERROR",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
"deployment.environment": "production",
|
||||
"host.name": "server-01",
|
||||
},
|
||||
attributes={
|
||||
"error.type": "ConnectionError",
|
||||
"db.name": "production_db",
|
||||
},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="Request processed",
|
||||
severity_text="DEBUG",
|
||||
resources={
|
||||
"service.name": "worker-service",
|
||||
"deployment.environment": "production",
|
||||
"host.name": "server-02",
|
||||
},
|
||||
attributes={
|
||||
"request.id": "req-456",
|
||||
"duration_ms": 150.5,
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "csv",
|
||||
"source": "logs",
|
||||
}
|
||||
|
||||
# Export logs as CSV (default format, no source needed)
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
assert "attachment" in response.headers.get("Content-Disposition", "")
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 3, f"Expected 3 rows, got {len(rows)}"
|
||||
|
||||
# Verify log bodies are present in the exported data
|
||||
bodies = [row.get("body") for row in rows]
|
||||
assert "Application started successfully" in bodies
|
||||
assert "Connection to database failed" in bodies
|
||||
assert "Request processed" in bodies
|
||||
|
||||
# Verify severity levels
|
||||
severities = [row.get("severity_text") for row in rows]
|
||||
assert "INFO" in severities
|
||||
assert "ERROR" in severities
|
||||
assert "DEBUG" in severities
|
||||
|
||||
|
||||
def test_export_logs_jsonl(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 2 logs with different attributes.
|
||||
|
||||
Tests:
|
||||
1. Export logs as JSONL format
|
||||
2. Verify JSONL structure and content
|
||||
3. Check each line is valid JSON
|
||||
4. Validate log data is correctly formatted
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="User logged in",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "auth-service",
|
||||
"deployment.environment": "staging",
|
||||
},
|
||||
attributes={
|
||||
"user.email": "test@example.com",
|
||||
"session.id": "sess-789",
|
||||
},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="Payment processed successfully",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "payment-service",
|
||||
"deployment.environment": "staging",
|
||||
},
|
||||
attributes={
|
||||
"transaction.id": "txn-123",
|
||||
"amount": 99.99,
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "logs",
|
||||
}
|
||||
|
||||
# Export logs as JSONL
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
assert "attachment" in response.headers.get("Content-Disposition", "")
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 2, f"Expected 2 lines, got {len(jsonl_lines)}"
|
||||
|
||||
# Verify each line is valid JSON
|
||||
json_objects = []
|
||||
for line in jsonl_lines:
|
||||
obj = json.loads(line)
|
||||
json_objects.append(obj)
|
||||
assert "id" in obj
|
||||
assert "timestamp" in obj
|
||||
assert "body" in obj
|
||||
assert "severity_text" in obj
|
||||
|
||||
# Verify log bodies
|
||||
bodies = [obj.get("body") for obj in json_objects]
|
||||
assert "User logged in" in bodies
|
||||
assert "Payment processed successfully" in bodies
|
||||
|
||||
|
||||
def test_export_logs_with_filter(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert logs with different severity levels.
|
||||
|
||||
Tests:
|
||||
1. Export logs with filter applied
|
||||
2. Verify only filtered logs are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="Info message",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
body="Error message",
|
||||
severity_text="ERROR",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="Another error message",
|
||||
severity_text="ERROR",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "logs",
|
||||
"filter": "severity_text = 'ERROR'",
|
||||
}
|
||||
|
||||
# Export logs with filter
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 2, f"Expected 2 lines (filtered), got {len(jsonl_lines)}"
|
||||
|
||||
# Verify only ERROR logs are returned
|
||||
for line in jsonl_lines:
|
||||
obj = json.loads(line)
|
||||
assert obj["severity_text"] == "ERROR"
|
||||
assert "error message" in obj["body"].lower()
|
||||
|
||||
|
||||
def test_export_logs_with_limit(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 5 logs.
|
||||
|
||||
Tests:
|
||||
1. Export logs with limit applied
|
||||
2. Verify only limited number of logs are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
logs = []
|
||||
for i in range(5):
|
||||
logs.append(
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=i),
|
||||
body=f"Log message {i}",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={
|
||||
"index": i,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
insert_logs(logs)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "csv",
|
||||
"source": "logs",
|
||||
"limit": 3,
|
||||
}
|
||||
|
||||
# Export logs with limit
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 3, f"Expected 3 rows (limited), got {len(rows)}"
|
||||
|
||||
|
||||
def test_export_logs_with_columns(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert logs with various attributes.
|
||||
|
||||
Tests:
|
||||
1. Export logs with specific columns
|
||||
2. Verify only specified columns are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="Test log message",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
"deployment.environment": "production",
|
||||
},
|
||||
attributes={
|
||||
"http.method": "GET",
|
||||
"http.status_code": 200,
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Request only specific columns
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "csv",
|
||||
"source": "logs",
|
||||
"columns": ["timestamp", "severity_text", "body"],
|
||||
}
|
||||
|
||||
# Export logs with specific columns
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params, doseq=True)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 1
|
||||
|
||||
# Verify the specified columns are present
|
||||
row = rows[0]
|
||||
assert "timestamp" in row
|
||||
assert "severity_text" in row
|
||||
assert "body" in row
|
||||
assert row["severity_text"] == "INFO"
|
||||
assert row["body"] == "Test log message"
|
||||
|
||||
|
||||
def test_export_logs_with_order_by(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert logs at different timestamps.
|
||||
|
||||
Tests:
|
||||
1. Export logs with ascending timestamp order
|
||||
2. Verify logs are returned in correct order
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="First log",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="Second log",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
body="Third log",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "logs",
|
||||
"order_by": "timestamp:asc",
|
||||
}
|
||||
|
||||
# Export logs with ascending order
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 3
|
||||
|
||||
# Verify order - first log should be "First log" (oldest)
|
||||
json_objects = [json.loads(line) for line in jsonl_lines]
|
||||
assert json_objects[0]["body"] == "First log"
|
||||
assert json_objects[1]["body"] == "Second log"
|
||||
assert json_objects[2]["body"] == "Third log"
|
||||
|
||||
|
||||
def test_export_logs_with_complex_filter(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[List[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert logs with various service names and severity levels.
|
||||
|
||||
Tests:
|
||||
1. Export logs with complex filter (multiple conditions)
|
||||
2. Verify only logs matching all conditions are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_logs(
|
||||
[
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
body="API error occurred",
|
||||
severity_text="ERROR",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
body="Worker info message",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "worker-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
body="API info message",
|
||||
severity_text="INFO",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Filter for api-service AND ERROR severity
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "logs",
|
||||
"filter": "service.name = 'api-service' AND severity_text = 'ERROR'",
|
||||
}
|
||||
|
||||
# Export logs with complex filter
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 1, f"Expected 1 line (complex filter), got {len(jsonl_lines)}"
|
||||
|
||||
# Verify the filtered log
|
||||
filtered_obj = json.loads(jsonl_lines[0])
|
||||
assert filtered_obj["body"] == "API error occurred"
|
||||
assert filtered_obj["severity_text"] == "ERROR"
|
||||
927
tests/integration/src/rawexportdata/02_traces.py
Normal file
927
tests/integration/src/rawexportdata/02_traces.py
Normal file
@@ -0,0 +1,927 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
from urllib.parse import urlencode
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
|
||||
|
||||
def test_export_traces_csv(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 3 traces with different attributes.
|
||||
|
||||
Tests:
|
||||
1. Export traces as CSV format
|
||||
2. Verify CSV structure and content
|
||||
3. Validate headers are present
|
||||
4. Check trace data is correctly formatted
|
||||
"""
|
||||
http_service_trace_id = TraceIdGenerator.trace_id()
|
||||
http_service_span_id = TraceIdGenerator.span_id()
|
||||
http_service_db_span_id = TraceIdGenerator.span_id()
|
||||
topic_service_trace_id = TraceIdGenerator.trace_id()
|
||||
topic_service_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /integration",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
},
|
||||
attributes={
|
||||
"net.transport": "IP.TCP",
|
||||
"http.scheme": "http",
|
||||
"http.user_agent": "Integration Test",
|
||||
"http.request.method": "POST",
|
||||
"http.response.status_code": "200",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=3.5),
|
||||
duration=timedelta(seconds=0.5),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_db_span_id,
|
||||
parent_span_id=http_service_span_id,
|
||||
name="SELECT",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
},
|
||||
attributes={
|
||||
"db.name": "integration",
|
||||
"db.operation": "SELECT",
|
||||
"db.statement": "SELECT * FROM integration",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=topic_service_trace_id,
|
||||
span_id=topic_service_span_id,
|
||||
parent_span_id="",
|
||||
name="topic publish",
|
||||
kind=TracesKind.SPAN_KIND_PRODUCER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "topic-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-001",
|
||||
},
|
||||
attributes={
|
||||
"message.type": "SENT",
|
||||
"messaging.operation": "publish",
|
||||
"messaging.message.id": "001",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Build composite query for traces export
|
||||
composite_query = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "A",
|
||||
"limit": 1000
|
||||
}
|
||||
}
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "csv",
|
||||
"source": "traces",
|
||||
"composite_query": json.dumps(composite_query)
|
||||
}
|
||||
|
||||
# Export traces as CSV
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
assert "attachment" in response.headers.get("Content-Disposition", "")
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 3, f"Expected 3 rows, got {len(rows)}"
|
||||
|
||||
# Verify trace IDs are present in the exported data
|
||||
trace_ids = [row.get("trace_id") for row in rows]
|
||||
assert http_service_trace_id in trace_ids
|
||||
assert topic_service_trace_id in trace_ids
|
||||
|
||||
# Verify span names are present
|
||||
span_names = [row.get("name") for row in rows]
|
||||
assert "POST /integration" in span_names
|
||||
assert "SELECT" in span_names
|
||||
assert "topic publish" in span_names
|
||||
|
||||
|
||||
def test_export_traces_jsonl(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 2 traces with different attributes.
|
||||
|
||||
Tests:
|
||||
1. Export traces as JSONL format
|
||||
2. Verify JSONL structure and content
|
||||
3. Check each line is valid JSON
|
||||
4. Validate trace data is correctly formatted
|
||||
"""
|
||||
http_service_trace_id = TraceIdGenerator.trace_id()
|
||||
http_service_span_id = TraceIdGenerator.span_id()
|
||||
topic_service_trace_id = TraceIdGenerator.trace_id()
|
||||
topic_service_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /api/test",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "api-service",
|
||||
"deployment.environment": "staging",
|
||||
},
|
||||
attributes={
|
||||
"http.request.method": "POST",
|
||||
"http.response.status_code": "201",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=topic_service_trace_id,
|
||||
span_id=topic_service_span_id,
|
||||
parent_span_id="",
|
||||
name="queue.process",
|
||||
kind=TracesKind.SPAN_KIND_CONSUMER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "queue-service",
|
||||
"deployment.environment": "staging",
|
||||
},
|
||||
attributes={
|
||||
"messaging.operation": "process",
|
||||
"messaging.system": "rabbitmq",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Build composite query for traces export
|
||||
composite_query = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "A",
|
||||
"limit": 1000
|
||||
}
|
||||
}
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "traces",
|
||||
"composite_query": json.dumps(composite_query)
|
||||
}
|
||||
|
||||
# Export traces as JSONL
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
assert "attachment" in response.headers.get("Content-Disposition", "")
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 2, f"Expected 2 lines, got {len(jsonl_lines)}"
|
||||
|
||||
# Verify each line is valid JSON
|
||||
json_objects = []
|
||||
for line in jsonl_lines:
|
||||
obj = json.loads(line)
|
||||
json_objects.append(obj)
|
||||
assert "trace_id" in obj
|
||||
assert "span_id" in obj
|
||||
assert "name" in obj
|
||||
|
||||
# Verify trace IDs are present
|
||||
trace_ids = [obj.get("trace_id") for obj in json_objects]
|
||||
assert http_service_trace_id in trace_ids
|
||||
assert topic_service_trace_id in trace_ids
|
||||
|
||||
# Verify span names are present
|
||||
span_names = [obj.get("name") for obj in json_objects]
|
||||
assert "POST /api/test" in span_names
|
||||
assert "queue.process" in span_names
|
||||
|
||||
|
||||
def test_export_traces_with_filter(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert traces with different service names.
|
||||
|
||||
Tests:
|
||||
1. Export traces with filter applied
|
||||
2. Verify only filtered traces are returned
|
||||
"""
|
||||
service_a_trace_id = TraceIdGenerator.trace_id()
|
||||
service_a_span_id = TraceIdGenerator.span_id()
|
||||
service_b_trace_id = TraceIdGenerator.trace_id()
|
||||
service_b_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=service_a_trace_id,
|
||||
span_id=service_a_span_id,
|
||||
parent_span_id="",
|
||||
name="operation-a",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "service-a",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=service_b_trace_id,
|
||||
span_id=service_b_span_id,
|
||||
parent_span_id="",
|
||||
name="operation-b",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "service-b",
|
||||
},
|
||||
attributes={},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Build composite query with filter for service-a only
|
||||
composite_query = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "A",
|
||||
"limit": 1000,
|
||||
"filter": {
|
||||
"expression": "service.name = 'service-a'"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "traces",
|
||||
"composite_query": json.dumps(composite_query)
|
||||
}
|
||||
|
||||
# Export traces with filter
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 1, f"Expected 1 line (filtered), got {len(jsonl_lines)}"
|
||||
|
||||
# Verify the filtered trace
|
||||
filtered_obj = json.loads(jsonl_lines[0])
|
||||
assert filtered_obj["trace_id"] == service_a_trace_id
|
||||
assert filtered_obj["name"] == "operation-a"
|
||||
|
||||
|
||||
def test_export_traces_with_limit(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 5 traces.
|
||||
|
||||
Tests:
|
||||
1. Export traces with limit applied
|
||||
2. Verify only limited number of traces are returned
|
||||
"""
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
traces = []
|
||||
for i in range(5):
|
||||
traces.append(
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=i),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id="",
|
||||
name=f"operation-{i}",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
},
|
||||
attributes={},
|
||||
)
|
||||
)
|
||||
|
||||
insert_traces(traces)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Build composite query with limit of 3
|
||||
composite_query = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "A",
|
||||
"limit": 3
|
||||
}
|
||||
}
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "csv",
|
||||
"source": "traces",
|
||||
"composite_query": json.dumps(composite_query)
|
||||
}
|
||||
|
||||
# Export traces with limit
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "text/csv"
|
||||
|
||||
# Parse CSV content
|
||||
csv_content = response.text
|
||||
csv_reader = csv.DictReader(io.StringIO(csv_content))
|
||||
|
||||
rows = list(csv_reader)
|
||||
assert len(rows) == 3, f"Expected 3 rows (limited), got {len(rows)}"
|
||||
|
||||
|
||||
def test_export_traces_with_multiple_composite_queries(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert traces with different service names and attributes.
|
||||
|
||||
Tests:
|
||||
1. Export traces using multiple composite queries
|
||||
2. Verify all queries are executed and results are combined
|
||||
"""
|
||||
service_a_trace_id = TraceIdGenerator.trace_id()
|
||||
service_a_span_id = TraceIdGenerator.span_id()
|
||||
service_b_trace_id = TraceIdGenerator.trace_id()
|
||||
service_b_span_id = TraceIdGenerator.span_id()
|
||||
service_c_trace_id = TraceIdGenerator.trace_id()
|
||||
service_c_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=service_a_trace_id,
|
||||
span_id=service_a_span_id,
|
||||
parent_span_id="",
|
||||
name="operation-a",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "service-a",
|
||||
},
|
||||
attributes={
|
||||
"http.status_code": "200",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=service_b_trace_id,
|
||||
span_id=service_b_span_id,
|
||||
parent_span_id="",
|
||||
name="operation-b",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_ERROR,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "service-b",
|
||||
},
|
||||
attributes={
|
||||
"http.status_code": "500",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=service_c_trace_id,
|
||||
span_id=service_c_span_id,
|
||||
parent_span_id="",
|
||||
name="operation-c",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "service-c",
|
||||
},
|
||||
attributes={
|
||||
"http.status_code": "200",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Build first composite query - filter for service-a
|
||||
composite_query_1 = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "A",
|
||||
"limit": 1000,
|
||||
"filter": {
|
||||
"expression": "service.name = 'service-a'"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Build second composite query - filter for service-b
|
||||
composite_query_2 = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "B",
|
||||
"limit": 1000,
|
||||
"filter": {
|
||||
"expression": "service.name = 'service-b'"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Multiple queries are supported: each query is executed and all results are combined
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "traces",
|
||||
}
|
||||
|
||||
# Add both composite queries as separate parameters
|
||||
from urllib.parse import quote
|
||||
url = signoz.self.host_configs["8080"].get("/api/v1/export_raw_data")
|
||||
url += f"?{urlencode(params)}"
|
||||
url += f"&composite_query={quote(json.dumps(composite_query_1))}"
|
||||
url += f"&composite_query={quote(json.dumps(composite_query_2))}"
|
||||
|
||||
# Export traces with multiple composite queries
|
||||
response = requests.get(
|
||||
url,
|
||||
timeout=30,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
# With multiple queries, we should get results from both queries
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) >= 1, f"Expected at least 1 line, got {len(jsonl_lines)}"
|
||||
|
||||
# Verify the result
|
||||
json_objects = [json.loads(line) for line in jsonl_lines]
|
||||
|
||||
# Check that we got results
|
||||
assert len(json_objects) > 0
|
||||
|
||||
# Verify we got traces from both service-a and service-b (or at least one of them)
|
||||
# Multiple queries return combined results from all queries
|
||||
trace_ids_in_results = [obj.get("trace_id") for obj in json_objects]
|
||||
|
||||
# We should have at least service-a or service-b (depending on query execution)
|
||||
assert service_a_trace_id in trace_ids_in_results or service_b_trace_id in trace_ids_in_results
|
||||
|
||||
# Count how many from each service we got
|
||||
service_a_count = trace_ids_in_results.count(service_a_trace_id)
|
||||
service_b_count = trace_ids_in_results.count(service_b_trace_id)
|
||||
|
||||
# At least one query should have returned results
|
||||
assert service_a_count > 0 or service_b_count > 0
|
||||
|
||||
def test_export_traces_with_composite_query_trace_operator(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert multiple traces with parent-child relationships.
|
||||
|
||||
Tests:
|
||||
1. Export traces using trace operator in composite query
|
||||
2. Verify trace operator query works correctly
|
||||
"""
|
||||
parent_trace_id = TraceIdGenerator.trace_id()
|
||||
parent_span_id = TraceIdGenerator.span_id()
|
||||
child_span_id_1 = TraceIdGenerator.span_id()
|
||||
child_span_id_2 = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=parent_trace_id,
|
||||
span_id=parent_span_id,
|
||||
parent_span_id="",
|
||||
name="parent-operation",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "parent-service",
|
||||
},
|
||||
attributes={
|
||||
"operation.type": "parent",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=parent_trace_id,
|
||||
span_id=child_span_id_1,
|
||||
parent_span_id=parent_span_id,
|
||||
name="child-operation-1",
|
||||
kind=TracesKind.SPAN_KIND_INTERNAL,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "parent-service",
|
||||
},
|
||||
attributes={
|
||||
"operation.type": "child",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=7),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=parent_trace_id,
|
||||
span_id=child_span_id_2,
|
||||
parent_span_id=parent_span_id,
|
||||
name="child-operation-2",
|
||||
kind=TracesKind.SPAN_KIND_INTERNAL,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "parent-service",
|
||||
},
|
||||
attributes={
|
||||
"operation.type": "child",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Build first composite query - filter for service-a
|
||||
composite_query_1 = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "A",
|
||||
"limit": 1000,
|
||||
"filter": {
|
||||
"expression": "operation.type = 'parent'"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Build second composite query - filter for service-b
|
||||
composite_query_2 = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "B",
|
||||
"limit": 1000,
|
||||
"filter": {
|
||||
"expression": "operation.type = 'child'"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Build composite query using trace operator
|
||||
composite_query = {
|
||||
"type": "builder_trace_operator",
|
||||
"spec": {
|
||||
"name": "C",
|
||||
"expression": "A => B",
|
||||
"limit": 1000
|
||||
}
|
||||
}
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "traces",
|
||||
}
|
||||
|
||||
# Add both composite queries as separate parameters
|
||||
from urllib.parse import quote
|
||||
url = signoz.self.host_configs["8080"].get("/api/v1/export_raw_data")
|
||||
url += f"?{urlencode(params)}"
|
||||
url += f"&composite_query={quote(json.dumps(composite_query_1))}"
|
||||
url += f"&composite_query={quote(json.dumps(composite_query_2))}"
|
||||
url += f"&composite_query={quote(json.dumps(composite_query))}"
|
||||
|
||||
|
||||
# Export traces using trace operator
|
||||
response = requests.get(
|
||||
url,
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
|
||||
assert len(jsonl_lines) == 2, f"Expected 2 lines, got {len(jsonl_lines)}"
|
||||
|
||||
# Verify all traces are from the same trace
|
||||
json_objects = [json.loads(line) for line in jsonl_lines]
|
||||
trace_ids = [obj.get("trace_id") for obj in json_objects]
|
||||
assert all(tid == parent_trace_id for tid in trace_ids)
|
||||
|
||||
# Verify span names
|
||||
span_names = [obj.get("name") for obj in json_objects]
|
||||
assert "parent-operation" in span_names
|
||||
|
||||
def test_export_traces_with_select_fields(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert traces with various attributes.
|
||||
|
||||
Tests:
|
||||
1. Export traces with specific select fields
|
||||
2. Verify only specified fields are returned in the output
|
||||
"""
|
||||
trace_id = TraceIdGenerator.trace_id()
|
||||
span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=trace_id,
|
||||
span_id=span_id,
|
||||
parent_span_id="",
|
||||
name="test-operation",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"service.name": "test-service",
|
||||
"deployment.environment": "production",
|
||||
"host.name": "server-01",
|
||||
},
|
||||
attributes={
|
||||
"http.method": "POST",
|
||||
"http.status_code": "201",
|
||||
"user.id": "user123",
|
||||
},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Calculate timestamps in nanoseconds
|
||||
start_ns = int((now - timedelta(minutes=5)).timestamp() * 1e9)
|
||||
end_ns = int(now.timestamp() * 1e9)
|
||||
|
||||
# Build composite query with specific select fields
|
||||
composite_query = {
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"signal": "traces",
|
||||
"name": "A",
|
||||
"limit": 1000,
|
||||
"selectFields": [
|
||||
{
|
||||
"name": "trace_id",
|
||||
"fieldDataType": "string",
|
||||
"fieldContext": "span",
|
||||
"signal": "traces"
|
||||
},
|
||||
{
|
||||
"name": "span_id",
|
||||
"fieldDataType": "string",
|
||||
"fieldContext": "span",
|
||||
"signal": "traces"
|
||||
},
|
||||
{
|
||||
"name": "name",
|
||||
"fieldDataType": "string",
|
||||
"fieldContext": "span",
|
||||
"signal": "traces"
|
||||
},
|
||||
{
|
||||
"name": "service.name",
|
||||
"fieldDataType": "string",
|
||||
"fieldContext": "resource",
|
||||
"signal": "traces"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
params = {
|
||||
"start": start_ns,
|
||||
"end": end_ns,
|
||||
"format": "jsonl",
|
||||
"source": "traces",
|
||||
"composite_query": json.dumps(composite_query)
|
||||
}
|
||||
|
||||
# Export traces with select fields
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/export_raw_data?{urlencode(params)}"),
|
||||
timeout=10,
|
||||
headers={
|
||||
"authorization": f"Bearer {token}",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.headers["Content-Type"] == "application/x-ndjson"
|
||||
|
||||
# Parse JSONL content
|
||||
jsonl_lines = response.text.strip().split("\n")
|
||||
assert len(jsonl_lines) == 1
|
||||
|
||||
# Verify the selected fields are present
|
||||
result = json.loads(jsonl_lines[0])
|
||||
assert "trace_id" in result
|
||||
assert "span_id" in result
|
||||
assert "name" in result
|
||||
|
||||
# Verify values
|
||||
assert result["trace_id"] == trace_id
|
||||
assert result["span_id"] == span_id
|
||||
assert result["name"] == "test-operation"
|
||||
Reference in New Issue
Block a user