Files
signoz/pkg/telemetrylogs/statement_builder.go
Tushar Vats 189781748a
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
fix: dont filter out static fields in metadata store (#10135)
This pull request introduces several improvements and bug fixes in the telemetry logs and traces statement builders, metadata key filtering, and integration tests. The most significant changes ensure that required fields are always present in queries, deprecated fields are handled correctly, and test coverage is expanded to handle edge cases such as corrupt data. Additionally, some code cleanup and formatting improvements are made.
2026-01-28 19:42:04 +05:30

687 lines
21 KiB
Go

package telemetrylogs
import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
type logQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
aggExprRewriter qbtypes.AggExprRewriter
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
}
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*logQueryStatementBuilder)(nil)
func NewLogQueryStatementBuilder(
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *logQueryStatementBuilder {
logsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrylogs")
return &logQueryStatementBuilder{
logger: logsSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
aggExprRewriter: aggExprRewriter,
fullTextColumn: fullTextColumn,
jsonKeyToKey: jsonKeyToKey,
}
}
// Build builds a SQL query for logs based on the given parameters
func (b *logQueryStatementBuilder) Build(
ctx context.Context,
start uint64,
end uint64,
requestType qbtypes.RequestType,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
start = querybuilder.ToNanoSecs(start)
end = querybuilder.ToNanoSecs(end)
keySelectors := getKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
query = b.adjustKeys(ctx, keys, query, requestType)
// Create SQL builder
q := sqlbuilder.NewSelectBuilder()
switch requestType {
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
return b.buildListQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeTimeSeries:
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeScalar:
return b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
}
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
}
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression)
keySelectors = append(keySelectors, selectors...)
}
if query.Filter != nil && query.Filter.Expression != "" {
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
keySelectors = append(keySelectors, whereClauseSelectors...)
}
for idx := range query.GroupBy {
groupBy := query.GroupBy[idx]
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: groupBy.Name,
Signal: telemetrytypes.SignalLogs,
FieldContext: groupBy.FieldContext,
FieldDataType: groupBy.FieldDataType,
})
}
for idx := range query.SelectFields {
selectField := query.SelectFields[idx]
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: selectField.Name,
Signal: telemetrytypes.SignalLogs,
FieldContext: selectField.FieldContext,
FieldDataType: selectField.FieldDataType,
})
}
for idx := range query.Order {
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: query.Order[idx].Key.Name,
Signal: telemetrytypes.SignalLogs,
FieldContext: query.Order[idx].Key.FieldContext,
FieldDataType: query.Order[idx].Key.FieldDataType,
})
}
for idx := range keySelectors {
keySelectors[idx].Signal = telemetrytypes.SignalLogs
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
}
return keySelectors
}
func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
// Always ensure timestamp and id are present in keys map
keys["id"] = append([]*telemetrytypes.TelemetryFieldKey{{
Name: "id",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
}}, keys["id"]...)
keys["timestamp"] = append([]*telemetrytypes.TelemetryFieldKey{{
Name: "timestamp",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
}}, keys["timestamp"]...)
/*
Adjust keys for alias expressions in aggregations
*/
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
/*
Check if user is using multiple contexts or data types for same field name
Idea is to use a super set of keys that can satisfy all the usages
For example, lets consider model_id exists in both attributes and resources
And user is trying to use `attribute.model_id` and `model_id`.
In this case, we'll remove the context from `attribute.model_id`
and make it just `model_id` and remove the duplicate entry.
Same goes with data types.
Consider user is using http.status_code:number and http.status_code
In this case, we'll remove the data type from http.status_code:number
and make it just http.status_code and remove the duplicate entry.
*/
actions = append(actions, querybuilder.AdjustDuplicateKeys(&query)...)
/*
Now adjust each key to have correct context and data type
Here we try to make intelligent guesses which work for all users (not just majority)
Reason for doing this is to not create an unexpected behavior for users
*/
for idx := range query.SelectFields {
actions = append(actions, b.adjustKey(&query.SelectFields[idx], keys)...)
}
for idx := range query.GroupBy {
actions = append(actions, b.adjustKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
}
for idx := range query.Order {
actions = append(actions, b.adjustKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
}
for _, action := range actions {
// TODO: change to debug level once we are confident about the behavior
b.logger.InfoContext(ctx, "key adjustment action", "action", action)
}
return query
}
func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
// First check if it matches with any intrinsic fields
var intrinsicOrCalculatedField telemetrytypes.TelemetryFieldKey
if _, ok := IntrinsicFields[key.Name]; ok {
intrinsicOrCalculatedField = IntrinsicFields[key.Name]
return querybuilder.AdjustKey(key, keys, &intrinsicOrCalculatedField)
}
return querybuilder.AdjustKey(key, keys, nil)
}
// buildListQuery builds a query for list panel type
func (b *logQueryStatementBuilder) buildListQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
// Select timestamp and id by default
sb.Select(LogsV2TimestampColumn)
sb.SelectMore(LogsV2IDColumn)
if len(query.SelectFields) == 0 {
// Select all default columns
sb.SelectMore(LogsV2TraceIDColumn)
sb.SelectMore(LogsV2SpanIDColumn)
sb.SelectMore(LogsV2TraceFlagsColumn)
sb.SelectMore(LogsV2SeverityTextColumn)
sb.SelectMore(LogsV2SeverityNumberColumn)
sb.SelectMore(LogsV2ScopeNameColumn)
sb.SelectMore(LogsV2ScopeVersionColumn)
sb.SelectMore(LogsV2BodyColumn)
if querybuilder.BodyJSONQueryEnabled {
sb.SelectMore(LogsV2BodyJSONColumn)
sb.SelectMore(LogsV2BodyPromotedColumn)
}
sb.SelectMore(LogsV2AttributesStringColumn)
sb.SelectMore(LogsV2AttributesNumberColumn)
sb.SelectMore(LogsV2AttributesBoolColumn)
sb.SelectMore(LogsV2ResourcesStringColumn)
sb.SelectMore(LogsV2ScopeStringColumn)
} else {
// Select specified columns
for index := range query.SelectFields {
if query.SelectFields[index].Name == LogsV2TimestampColumn || query.SelectFields[index].Name == LogsV2IDColumn {
continue
}
// get column expression for the field - use array index directly to avoid pointer to loop variable
colExpr, err := b.fm.ColumnExpressionFor(ctx, &query.SelectFields[index], keys)
if err != nil {
return nil, err
}
sb.SelectMore(colExpr)
}
}
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
// Add filter conditions
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
if err != nil {
return nil, err
}
// Add order by
for _, orderBy := range query.Order {
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
}
// Add limit and offset
if query.Limit > 0 {
sb.Limit(query.Limit)
} else {
sb.Limit(100)
}
if query.Offset > 0 {
sb.Offset(query.Offset)
}
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
stmt := &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
}
if preparedWhereClause != nil {
stmt.Warnings = preparedWhereClause.Warnings
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
}
return stmt, nil
}
func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
int64(query.StepInterval.Seconds()),
))
var allGroupByArgs []any
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
if err != nil {
return nil, err
}
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
allGroupByArgs = append(allGroupByArgs, args...)
sb.SelectMore(colExpr)
fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name))
}
// Aggregations
allAggChArgs := make([]any, 0)
for i, agg := range query.Aggregations {
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
ctx, agg.Expression,
uint64(query.StepInterval.Seconds()),
keys,
)
if err != nil {
return nil, err
}
allAggChArgs = append(allAggChArgs, chArgs...)
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
}
// Add FROM clause
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
if err != nil {
return nil, err
}
var finalSQL string
var finalArgs []any
if query.Limit > 0 && len(query.GroupBy) > 0 {
// build the scalar “top/bottom-N” query in its own builder.
cteSB := sqlbuilder.NewSelectBuilder()
cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true, variables)
if err != nil {
return nil, err
}
cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query))
cteArgs = append(cteArgs, cteStmt.Args)
// Constrain the main query to the rows that appear in the CTE.
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
// Group by all dimensions
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" {
// Rewrite having expression to use SQL column names
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
sb.Having(rewrittenExpr)
}
if len(query.Order) != 0 {
for _, orderBy := range query.Order {
_, ok := aggOrderBy(orderBy, query)
if !ok {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
sb.OrderBy("ts desc")
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
// Stitch it all together: WITH … SELECT …
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
} else {
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
sb.Having(rewrittenExpr)
}
if len(query.Order) != 0 {
for _, orderBy := range query.Order {
_, ok := aggOrderBy(orderBy, query)
if !ok {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
sb.OrderBy("ts desc")
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
// Stitch it all together: WITH … SELECT …
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
}
stmt := &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
}
if preparedWhereClause != nil {
stmt.Warnings = preparedWhereClause.Warnings
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
}
return stmt, nil
}
// buildScalarQuery builds a query for scalar panel type
func (b *logQueryStatementBuilder) buildScalarQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
skipResourceCTE bool,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
return nil, err
} else if frag != "" && !skipResourceCTE {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
allAggChArgs := []any{}
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
if err != nil {
return nil, err
}
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
allGroupByArgs = append(allGroupByArgs, args...)
sb.SelectMore(colExpr)
}
// for scalar queries, the rate would be end-start
rateInterval := (end - start) / querybuilder.NsToSeconds
// Add aggregation
if len(query.Aggregations) > 0 {
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
ctx, aggExpr.Expression,
rateInterval,
keys,
)
if err != nil {
return nil, err
}
allAggChArgs = append(allAggChArgs, chArgs...)
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx))
}
}
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
// Add filter conditions
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
if err != nil {
return nil, err
}
// Group by dimensions
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
// Add having clause if needed
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
sb.Having(rewrittenExpr)
}
// Add order by
for _, orderBy := range query.Order {
idx, ok := aggOrderBy(orderBy, query)
if ok {
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue()))
} else {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
// if there is no order by, then use the __result_0 as the order by
if len(query.Order) == 0 {
sb.OrderBy("__result_0 DESC")
}
// Add limit and offset
if query.Limit > 0 {
sb.Limit(query.Limit)
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
stmt := &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
}
if preparedWhereClause != nil {
stmt.Warnings = preparedWhereClause.Warnings
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
}
return stmt, nil
}
// buildFilterCondition builds SQL condition from filter expression
func (b *logQueryStatementBuilder) addFilterCondition(
_ context.Context,
sb *sqlbuilder.SelectBuilder,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*querybuilder.PreparedWhereClause, error) {
var preparedWhereClause *querybuilder.PreparedWhereClause
var err error
if query.Filter != nil && query.Filter.Expression != "" {
// add filter expression
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
SkipResourceFilter: true,
FullTextColumn: b.fullTextColumn,
JsonKeyToKey: b.jsonKeyToKey,
Variables: variables,
}, start, end)
if err != nil {
return nil, err
}
}
if preparedWhereClause != nil {
sb.AddWhereClause(preparedWhereClause.WhereClause)
}
// add time filter
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
var endBucket uint64
if end != 0 {
endBucket = end / querybuilder.NsToSeconds
}
if start != 0 {
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.GE("ts_bucket_start", startBucket))
}
if end != 0 {
sb.Where(sb.L("timestamp", fmt.Sprintf("%d", end)), sb.LE("ts_bucket_start", endBucket))
}
return preparedWhereClause, nil
}
func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) {
for i, agg := range q.Aggregations {
if k.Key.Name == agg.Alias ||
k.Key.Name == agg.Expression ||
k.Key.Name == fmt.Sprintf("%d", i) {
return i, true
}
}
return 0, false
}
func (b *logQueryStatementBuilder) maybeAttachResourceFilter(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
variables map[string]qbtypes.VariableItem,
) (cteSQL string, cteArgs []any, err error) {
stmt, err := b.buildResourceFilterCTE(ctx, query, start, end, variables)
if err != nil {
return "", nil, err
}
sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
}
func (b *logQueryStatementBuilder) buildResourceFilterCTE(
ctx context.Context,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
return b.resourceFilterStmtBuilder.Build(
ctx,
start,
end,
qbtypes.RequestTypeRaw,
query,
variables,
)
}