mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-03 08:33:26 +00:00
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
This pull request introduces several improvements and bug fixes in the telemetry logs and traces statement builders, metadata key filtering, and integration tests. The most significant changes ensure that required fields are always present in queries, deprecated fields are handled correctly, and test coverage is expanded to handle edge cases such as corrupt data. Additionally, some code cleanup and formatting improvements are made.
687 lines
21 KiB
Go
687 lines
21 KiB
Go
package telemetrylogs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
"github.com/huandu/go-sqlbuilder"
|
|
)
|
|
|
|
type logQueryStatementBuilder struct {
|
|
logger *slog.Logger
|
|
metadataStore telemetrytypes.MetadataStore
|
|
fm qbtypes.FieldMapper
|
|
cb qbtypes.ConditionBuilder
|
|
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
|
aggExprRewriter qbtypes.AggExprRewriter
|
|
|
|
fullTextColumn *telemetrytypes.TelemetryFieldKey
|
|
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
|
}
|
|
|
|
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*logQueryStatementBuilder)(nil)
|
|
|
|
func NewLogQueryStatementBuilder(
|
|
settings factory.ProviderSettings,
|
|
metadataStore telemetrytypes.MetadataStore,
|
|
fieldMapper qbtypes.FieldMapper,
|
|
conditionBuilder qbtypes.ConditionBuilder,
|
|
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
|
aggExprRewriter qbtypes.AggExprRewriter,
|
|
fullTextColumn *telemetrytypes.TelemetryFieldKey,
|
|
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
|
|
) *logQueryStatementBuilder {
|
|
logsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrylogs")
|
|
|
|
return &logQueryStatementBuilder{
|
|
logger: logsSettings.Logger(),
|
|
metadataStore: metadataStore,
|
|
fm: fieldMapper,
|
|
cb: conditionBuilder,
|
|
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
|
|
aggExprRewriter: aggExprRewriter,
|
|
fullTextColumn: fullTextColumn,
|
|
jsonKeyToKey: jsonKeyToKey,
|
|
}
|
|
}
|
|
|
|
// Build builds a SQL query for logs based on the given parameters
|
|
func (b *logQueryStatementBuilder) Build(
|
|
ctx context.Context,
|
|
start uint64,
|
|
end uint64,
|
|
requestType qbtypes.RequestType,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
start = querybuilder.ToNanoSecs(start)
|
|
end = querybuilder.ToNanoSecs(end)
|
|
|
|
keySelectors := getKeySelectors(query)
|
|
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
query = b.adjustKeys(ctx, keys, query, requestType)
|
|
|
|
// Create SQL builder
|
|
q := sqlbuilder.NewSelectBuilder()
|
|
|
|
switch requestType {
|
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
|
|
return b.buildListQuery(ctx, q, query, start, end, keys, variables)
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
|
|
case qbtypes.RequestTypeScalar:
|
|
return b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
|
|
}
|
|
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
|
|
}
|
|
|
|
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
|
|
var keySelectors []*telemetrytypes.FieldKeySelector
|
|
|
|
for idx := range query.Aggregations {
|
|
aggExpr := query.Aggregations[idx]
|
|
selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression)
|
|
keySelectors = append(keySelectors, selectors...)
|
|
}
|
|
|
|
if query.Filter != nil && query.Filter.Expression != "" {
|
|
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
|
|
keySelectors = append(keySelectors, whereClauseSelectors...)
|
|
}
|
|
|
|
for idx := range query.GroupBy {
|
|
groupBy := query.GroupBy[idx]
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
|
Name: groupBy.Name,
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: groupBy.FieldContext,
|
|
FieldDataType: groupBy.FieldDataType,
|
|
})
|
|
}
|
|
|
|
for idx := range query.SelectFields {
|
|
selectField := query.SelectFields[idx]
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
|
Name: selectField.Name,
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: selectField.FieldContext,
|
|
FieldDataType: selectField.FieldDataType,
|
|
})
|
|
}
|
|
|
|
for idx := range query.Order {
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
|
Name: query.Order[idx].Key.Name,
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: query.Order[idx].Key.FieldContext,
|
|
FieldDataType: query.Order[idx].Key.FieldDataType,
|
|
})
|
|
}
|
|
|
|
for idx := range keySelectors {
|
|
keySelectors[idx].Signal = telemetrytypes.SignalLogs
|
|
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
|
}
|
|
|
|
return keySelectors
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
|
|
|
|
// Always ensure timestamp and id are present in keys map
|
|
keys["id"] = append([]*telemetrytypes.TelemetryFieldKey{{
|
|
Name: "id",
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: telemetrytypes.FieldContextLog,
|
|
FieldDataType: telemetrytypes.FieldDataTypeString,
|
|
}}, keys["id"]...)
|
|
|
|
keys["timestamp"] = append([]*telemetrytypes.TelemetryFieldKey{{
|
|
Name: "timestamp",
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: telemetrytypes.FieldContextLog,
|
|
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
|
}}, keys["timestamp"]...)
|
|
|
|
/*
|
|
Adjust keys for alias expressions in aggregations
|
|
*/
|
|
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
|
|
|
|
/*
|
|
Check if user is using multiple contexts or data types for same field name
|
|
Idea is to use a super set of keys that can satisfy all the usages
|
|
|
|
For example, lets consider model_id exists in both attributes and resources
|
|
And user is trying to use `attribute.model_id` and `model_id`.
|
|
|
|
In this case, we'll remove the context from `attribute.model_id`
|
|
and make it just `model_id` and remove the duplicate entry.
|
|
|
|
Same goes with data types.
|
|
Consider user is using http.status_code:number and http.status_code
|
|
In this case, we'll remove the data type from http.status_code:number
|
|
and make it just http.status_code and remove the duplicate entry.
|
|
*/
|
|
|
|
actions = append(actions, querybuilder.AdjustDuplicateKeys(&query)...)
|
|
|
|
/*
|
|
Now adjust each key to have correct context and data type
|
|
Here we try to make intelligent guesses which work for all users (not just majority)
|
|
Reason for doing this is to not create an unexpected behavior for users
|
|
*/
|
|
for idx := range query.SelectFields {
|
|
actions = append(actions, b.adjustKey(&query.SelectFields[idx], keys)...)
|
|
}
|
|
for idx := range query.GroupBy {
|
|
actions = append(actions, b.adjustKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
|
|
}
|
|
for idx := range query.Order {
|
|
actions = append(actions, b.adjustKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
|
|
}
|
|
|
|
for _, action := range actions {
|
|
// TODO: change to debug level once we are confident about the behavior
|
|
b.logger.InfoContext(ctx, "key adjustment action", "action", action)
|
|
}
|
|
|
|
return query
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
|
|
|
|
// First check if it matches with any intrinsic fields
|
|
var intrinsicOrCalculatedField telemetrytypes.TelemetryFieldKey
|
|
if _, ok := IntrinsicFields[key.Name]; ok {
|
|
intrinsicOrCalculatedField = IntrinsicFields[key.Name]
|
|
return querybuilder.AdjustKey(key, keys, &intrinsicOrCalculatedField)
|
|
}
|
|
|
|
return querybuilder.AdjustKey(key, keys, nil)
|
|
|
|
}
|
|
|
|
// buildListQuery builds a query for list panel type
|
|
func (b *logQueryStatementBuilder) buildListQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
// Select timestamp and id by default
|
|
sb.Select(LogsV2TimestampColumn)
|
|
sb.SelectMore(LogsV2IDColumn)
|
|
if len(query.SelectFields) == 0 {
|
|
// Select all default columns
|
|
sb.SelectMore(LogsV2TraceIDColumn)
|
|
sb.SelectMore(LogsV2SpanIDColumn)
|
|
sb.SelectMore(LogsV2TraceFlagsColumn)
|
|
sb.SelectMore(LogsV2SeverityTextColumn)
|
|
sb.SelectMore(LogsV2SeverityNumberColumn)
|
|
sb.SelectMore(LogsV2ScopeNameColumn)
|
|
sb.SelectMore(LogsV2ScopeVersionColumn)
|
|
sb.SelectMore(LogsV2BodyColumn)
|
|
if querybuilder.BodyJSONQueryEnabled {
|
|
sb.SelectMore(LogsV2BodyJSONColumn)
|
|
sb.SelectMore(LogsV2BodyPromotedColumn)
|
|
}
|
|
sb.SelectMore(LogsV2AttributesStringColumn)
|
|
sb.SelectMore(LogsV2AttributesNumberColumn)
|
|
sb.SelectMore(LogsV2AttributesBoolColumn)
|
|
sb.SelectMore(LogsV2ResourcesStringColumn)
|
|
sb.SelectMore(LogsV2ScopeStringColumn)
|
|
|
|
} else {
|
|
// Select specified columns
|
|
for index := range query.SelectFields {
|
|
if query.SelectFields[index].Name == LogsV2TimestampColumn || query.SelectFields[index].Name == LogsV2IDColumn {
|
|
continue
|
|
}
|
|
|
|
// get column expression for the field - use array index directly to avoid pointer to loop variable
|
|
colExpr, err := b.fm.ColumnExpressionFor(ctx, &query.SelectFields[index], keys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sb.SelectMore(colExpr)
|
|
}
|
|
}
|
|
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
|
|
// Add filter conditions
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Add order by
|
|
for _, orderBy := range query.Order {
|
|
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
|
|
}
|
|
|
|
// Add limit and offset
|
|
if query.Limit > 0 {
|
|
sb.Limit(query.Limit)
|
|
} else {
|
|
sb.Limit(100)
|
|
}
|
|
|
|
if query.Offset > 0 {
|
|
sb.Offset(query.Offset)
|
|
}
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
sb.SelectMore(fmt.Sprintf(
|
|
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
|
|
int64(query.StepInterval.Seconds()),
|
|
))
|
|
|
|
var allGroupByArgs []any
|
|
|
|
// Keep original column expressions so we can build the tuple
|
|
fieldNames := make([]string, 0, len(query.GroupBy))
|
|
for _, gb := range query.GroupBy {
|
|
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
|
|
allGroupByArgs = append(allGroupByArgs, args...)
|
|
sb.SelectMore(colExpr)
|
|
fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name))
|
|
}
|
|
|
|
// Aggregations
|
|
allAggChArgs := make([]any, 0)
|
|
for i, agg := range query.Aggregations {
|
|
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
|
ctx, agg.Expression,
|
|
uint64(query.StepInterval.Seconds()),
|
|
keys,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
allAggChArgs = append(allAggChArgs, chArgs...)
|
|
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
|
|
}
|
|
|
|
// Add FROM clause
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var finalSQL string
|
|
var finalArgs []any
|
|
|
|
if query.Limit > 0 && len(query.GroupBy) > 0 {
|
|
// build the scalar “top/bottom-N” query in its own builder.
|
|
cteSB := sqlbuilder.NewSelectBuilder()
|
|
cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true, variables)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query))
|
|
cteArgs = append(cteArgs, cteStmt.Args)
|
|
|
|
// Constrain the main query to the rows that appear in the CTE.
|
|
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
|
|
sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
|
|
|
|
// Group by all dimensions
|
|
sb.GroupBy("ts")
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
// Rewrite having expression to use SQL column names
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
if len(query.Order) != 0 {
|
|
for _, orderBy := range query.Order {
|
|
_, ok := aggOrderBy(orderBy, query)
|
|
if !ok {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
sb.OrderBy("ts desc")
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
// Stitch it all together: WITH … SELECT …
|
|
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
} else {
|
|
sb.GroupBy("ts")
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
if len(query.Order) != 0 {
|
|
for _, orderBy := range query.Order {
|
|
_, ok := aggOrderBy(orderBy, query)
|
|
if !ok {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
sb.OrderBy("ts desc")
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
// Stitch it all together: WITH … SELECT …
|
|
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
}
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
// buildScalarQuery builds a query for scalar panel type
|
|
func (b *logQueryStatementBuilder) buildScalarQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
skipResourceCTE bool,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" && !skipResourceCTE {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
allAggChArgs := []any{}
|
|
|
|
var allGroupByArgs []any
|
|
|
|
for _, gb := range query.GroupBy {
|
|
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
|
|
allGroupByArgs = append(allGroupByArgs, args...)
|
|
sb.SelectMore(colExpr)
|
|
}
|
|
|
|
// for scalar queries, the rate would be end-start
|
|
rateInterval := (end - start) / querybuilder.NsToSeconds
|
|
|
|
// Add aggregation
|
|
if len(query.Aggregations) > 0 {
|
|
for idx := range query.Aggregations {
|
|
aggExpr := query.Aggregations[idx]
|
|
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
|
ctx, aggExpr.Expression,
|
|
rateInterval,
|
|
keys,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
allAggChArgs = append(allAggChArgs, chArgs...)
|
|
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx))
|
|
}
|
|
}
|
|
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
|
|
// Add filter conditions
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Group by dimensions
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
|
|
// Add having clause if needed
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
// Add order by
|
|
for _, orderBy := range query.Order {
|
|
idx, ok := aggOrderBy(orderBy, query)
|
|
if ok {
|
|
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue()))
|
|
} else {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
|
|
// if there is no order by, then use the __result_0 as the order by
|
|
if len(query.Order) == 0 {
|
|
sb.OrderBy("__result_0 DESC")
|
|
}
|
|
|
|
// Add limit and offset
|
|
if query.Limit > 0 {
|
|
sb.Limit(query.Limit)
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
// buildFilterCondition builds SQL condition from filter expression
|
|
func (b *logQueryStatementBuilder) addFilterCondition(
|
|
_ context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
start, end uint64,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*querybuilder.PreparedWhereClause, error) {
|
|
|
|
var preparedWhereClause *querybuilder.PreparedWhereClause
|
|
var err error
|
|
|
|
if query.Filter != nil && query.Filter.Expression != "" {
|
|
// add filter expression
|
|
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
|
Logger: b.logger,
|
|
FieldMapper: b.fm,
|
|
ConditionBuilder: b.cb,
|
|
FieldKeys: keys,
|
|
SkipResourceFilter: true,
|
|
FullTextColumn: b.fullTextColumn,
|
|
JsonKeyToKey: b.jsonKeyToKey,
|
|
Variables: variables,
|
|
}, start, end)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if preparedWhereClause != nil {
|
|
sb.AddWhereClause(preparedWhereClause.WhereClause)
|
|
}
|
|
|
|
// add time filter
|
|
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
|
|
var endBucket uint64
|
|
if end != 0 {
|
|
endBucket = end / querybuilder.NsToSeconds
|
|
}
|
|
|
|
if start != 0 {
|
|
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.GE("ts_bucket_start", startBucket))
|
|
}
|
|
if end != 0 {
|
|
sb.Where(sb.L("timestamp", fmt.Sprintf("%d", end)), sb.LE("ts_bucket_start", endBucket))
|
|
}
|
|
|
|
return preparedWhereClause, nil
|
|
}
|
|
|
|
func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) {
|
|
for i, agg := range q.Aggregations {
|
|
if k.Key.Name == agg.Alias ||
|
|
k.Key.Name == agg.Expression ||
|
|
k.Key.Name == fmt.Sprintf("%d", i) {
|
|
return i, true
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) maybeAttachResourceFilter(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (cteSQL string, cteArgs []any, err error) {
|
|
|
|
stmt, err := b.buildResourceFilterCTE(ctx, query, start, end, variables)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
|
|
|
|
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) buildResourceFilterCTE(
|
|
ctx context.Context,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
return b.resourceFilterStmtBuilder.Build(
|
|
ctx,
|
|
start,
|
|
end,
|
|
qbtypes.RequestTypeRaw,
|
|
query,
|
|
variables,
|
|
)
|
|
}
|