Compare commits

...

3 Commits

Author SHA1 Message Date
Nikhil Soni
1740d6318c chore: simplify chunkSize calculation 2026-06-16 14:29:48 +05:30
Nikhil Soni
0c7f204ecd fix: remove row count limit
Since response size limit is also present
2026-06-13 23:09:52 +05:30
Nikhil Soni
5e3c51900e feat: use cursor for pagination in raw data export module 2026-06-13 22:58:21 +05:30
4 changed files with 28 additions and 44 deletions

View File

@@ -5,10 +5,6 @@ import (
)
const (
// Row Limits.
MaxExportRowCountLimit = 50_000 // 50k
DefaultExportRowCountLimit = 10_000 // 10k
// Data Limits.
MaxExportBytesLimit = 10 * 1024 * 1024 * 1024 // 10 GB

View File

@@ -105,15 +105,9 @@ func validateSpecForExport(req *qbtypes.QueryRangeRequest) error {
func validateAndApplyDefaultExportLimits(queries []qbtypes.QueryEnvelope) error {
for idx := range queries {
limit := queries[idx].GetLimit()
if limit == 0 {
limit = DefaultExportRowCountLimit
} else if limit < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be positive")
} else if limit > MaxExportRowCountLimit {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit cannot be more than %d", MaxExportRowCountLimit)
if queries[idx].GetLimit() < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be non-negative (0 = unlimited)")
}
queries[idx].SetLimit(limit)
}
return nil
}

View File

@@ -97,48 +97,29 @@ func TestValidateAndApplyDefaultExportLimits(t *testing.T) {
checkQueries func(t *testing.T, queries []qbtypes.QueryEnvelope)
}{
{
name: "single log query, zero limit gets default",
name: "zero limit kept as-is (unlimited)",
queries: makeRequest(logQuery(0)).CompositeQuery.Queries,
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
assert.Equal(t, DefaultExportRowCountLimit, q[0].GetLimit())
assert.Equal(t, 0, q[0].GetLimit())
},
},
{
name: "single log query, valid limit kept",
name: "positive limit kept",
queries: makeRequest(logQuery(1000)).CompositeQuery.Queries,
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
assert.Equal(t, 1000, q[0].GetLimit())
},
},
{
name: "single log query, max limit kept",
queries: makeRequest(logQuery(MaxExportRowCountLimit)).CompositeQuery.Queries,
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
assert.Equal(t, MaxExportRowCountLimit, q[0].GetLimit())
},
},
{
name: "single log query, limit exceeds max",
queries: makeRequest(logQuery(MaxExportRowCountLimit + 1)).CompositeQuery.Queries,
expectedError: true,
},
{
name: "single log query, negative limit",
name: "negative limit rejected",
queries: makeRequest(logQuery(-1)).CompositeQuery.Queries,
expectedError: true,
},
{
name: "single trace query, zero limit gets default",
queries: makeRequest(traceQuery(0)).CompositeQuery.Queries,
name: "large trace limit kept",
queries: makeRequest(traceQuery(2_000_000)).CompositeQuery.Queries,
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
assert.Equal(t, DefaultExportRowCountLimit, q[0].GetLimit())
},
},
{
name: "trace operator alone, zero limit gets default",
queries: makeRequest(traceOperatorQuery(0)).CompositeQuery.Queries,
checkQueries: func(t *testing.T, q []qbtypes.QueryEnvelope) {
assert.Equal(t, DefaultExportRowCountLimit, q[0].GetLimit())
assert.Equal(t, 2_000_000, q[0].GetLimit())
},
},
}

View File

@@ -69,13 +69,22 @@ func (m *Module) ExportRawData(ctx context.Context, orgID valuer.UUID, rangeRequ
func exportRawDataForSingleQuery(querier querier.Querier, ctx context.Context, orgID valuer.UUID, rangeRequest *qbtypes.QueryRangeRequest, rowChan chan *qbtypes.RawRow, errChan chan error, doneChan chan any, queryIndex int) {
queries := rangeRequest.CompositeQuery.Queries
rowCountLimit := queries[queryIndex].GetLimit()
rowCountLimit := queries[queryIndex].GetLimit() // 0 means no limit
rowCount := 0
for rowCount < rowCountLimit {
chunkSize := min(ChunkSize, rowCountLimit-rowCount)
queries[queryIndex].SetOffset(0)
cursor := ""
for {
chunkSize := ChunkSize
if rowCountLimit > 0 {
chunkSize = min(ChunkSize, rowCountLimit-rowCount)
}
if chunkSize <= 0 {
return
}
queries[queryIndex].SetLimit(chunkSize)
queries[queryIndex].SetOffset(rowCount)
queries[queryIndex].SetCursor(cursor)
response, err := querier.QueryRange(ctx, orgID, rangeRequest)
if err != nil {
@@ -84,6 +93,7 @@ func exportRawDataForSingleQuery(querier querier.Querier, ctx context.Context, o
}
newRowsCount := 0
nextCursor := ""
for _, result := range response.Data.Results {
resultData, ok := result.(*qbtypes.RawData)
if !ok {
@@ -91,6 +101,9 @@ func exportRawDataForSingleQuery(querier querier.Querier, ctx context.Context, o
return
}
if resultData.NextCursor != "" {
nextCursor = resultData.NextCursor
}
newRowsCount += len(resultData.Rows)
for _, row := range resultData.Rows {
select {
@@ -106,9 +119,9 @@ func exportRawDataForSingleQuery(querier querier.Querier, ctx context.Context, o
rowCount += newRowsCount
// Stop if we received fewer rows than requested — no more data available
if newRowsCount < chunkSize {
if nextCursor == "" || newRowsCount < chunkSize {
return
}
cursor = nextCursor
}
}