mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-20 03:30:26 +00:00
* feat: has JSON QB * fix: tests expected queries and values * fix: ignored .vscode in gitignore * fix: tests GroupBy * revert: gitignore change * fix: build json plans in metadata * fix: empty filteredArrays condition * fix: tests * fix: tests * fix: json qb test fix * fix: review based on tushar * fix: changes based on review from Srikanth * fix: remove unnecessary bool checking * fix: removed comment * fix: merge json body columns together * chore: var renamed * fix: merge conflict * test: fix * fix: tests * fix: go test flakiness * chore: merge json fields * fix: handle datatype collision * revert: few unrelated changes * revert: more unrelated change * test: blocked on pr #10153 * feat: mapping body_v2.message:string map to body * fix: go.mod required changes * fix: remove unused function * fix: test fixed * fix: go mod changes * fix: tests * fix: go lint * revert: remvoing unused function * revert: change ReadMultiple is needed * fix: body.message not being mapped correctly * fix: append warnings from fieldkeys * fix: change warning to a const to fix tests * chore: addressing comments from Nitya * chore: remove unnecessary change * fix: shift warning attachment to getKeySelectors * fix: lint error * feat: update message as typehint in JSON Column (#10545) * fix: cursor comments * chore: minor changes based on review * fix: message field key search in JSON Logs (#10577) * feat: work in progress * fix: test run success * fix: in progress * fix: excluding message from metadata fetch * test: cleared * fix: key name in metadata * fix: uncomment tests * chore: change to method for staticfields * fix: remove confusing comments; remove usage of logical keyword * chore: shift method above business logic * chore: changes based on review * fix: comments in metadata_store.go * fix: fallback expr switch case * revert: remove unused JSON Field datatype * fix: remove the exception checking * chore: keep message contained to field mapper * chore: text search tests * fix: package test fixed * fix: redundant code block removal * fix: retain staticfield implementation and spell fix * fix: nil param lint --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
702 lines
22 KiB
Go
702 lines
22 KiB
Go
package telemetrylogs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
"github.com/huandu/go-sqlbuilder"
|
|
)
|
|
|
|
type logQueryStatementBuilder struct {
|
|
logger *slog.Logger
|
|
metadataStore telemetrytypes.MetadataStore
|
|
fm qbtypes.FieldMapper
|
|
cb qbtypes.ConditionBuilder
|
|
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
|
aggExprRewriter qbtypes.AggExprRewriter
|
|
|
|
fullTextColumn *telemetrytypes.TelemetryFieldKey
|
|
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
|
}
|
|
|
|
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*logQueryStatementBuilder)(nil)
|
|
|
|
func NewLogQueryStatementBuilder(
|
|
settings factory.ProviderSettings,
|
|
metadataStore telemetrytypes.MetadataStore,
|
|
fieldMapper qbtypes.FieldMapper,
|
|
conditionBuilder qbtypes.ConditionBuilder,
|
|
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
|
aggExprRewriter qbtypes.AggExprRewriter,
|
|
fullTextColumn *telemetrytypes.TelemetryFieldKey,
|
|
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
|
|
) *logQueryStatementBuilder {
|
|
logsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrylogs")
|
|
|
|
return &logQueryStatementBuilder{
|
|
logger: logsSettings.Logger(),
|
|
metadataStore: metadataStore,
|
|
fm: fieldMapper,
|
|
cb: conditionBuilder,
|
|
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
|
|
aggExprRewriter: aggExprRewriter,
|
|
fullTextColumn: fullTextColumn,
|
|
jsonKeyToKey: jsonKeyToKey,
|
|
}
|
|
}
|
|
|
|
// Build builds a SQL query for logs based on the given parameters
|
|
func (b *logQueryStatementBuilder) Build(
|
|
ctx context.Context,
|
|
start uint64,
|
|
end uint64,
|
|
requestType qbtypes.RequestType,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
start = querybuilder.ToNanoSecs(start)
|
|
end = querybuilder.ToNanoSecs(end)
|
|
|
|
keySelectors, warnings := getKeySelectors(query)
|
|
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
query = b.adjustKeys(ctx, keys, query, requestType)
|
|
|
|
// Create SQL builder
|
|
q := sqlbuilder.NewSelectBuilder()
|
|
|
|
var stmt *qbtypes.Statement
|
|
switch requestType {
|
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
|
|
stmt, err = b.buildListQuery(ctx, q, query, start, end, keys, variables)
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
stmt, err = b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
|
|
case qbtypes.RequestTypeScalar:
|
|
stmt, err = b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
|
|
default:
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stmt.Warnings = append(stmt.Warnings, warnings...)
|
|
return stmt, nil
|
|
}
|
|
|
|
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([]*telemetrytypes.FieldKeySelector, []string) {
|
|
var keySelectors []*telemetrytypes.FieldKeySelector
|
|
var warnings []string
|
|
|
|
for idx := range query.Aggregations {
|
|
aggExpr := query.Aggregations[idx]
|
|
selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression)
|
|
keySelectors = append(keySelectors, selectors...)
|
|
}
|
|
|
|
if query.Filter != nil && query.Filter.Expression != "" {
|
|
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
|
|
keySelectors = append(keySelectors, whereClauseSelectors...)
|
|
}
|
|
|
|
for idx := range query.GroupBy {
|
|
groupBy := query.GroupBy[idx]
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
|
Name: groupBy.Name,
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: groupBy.FieldContext,
|
|
FieldDataType: groupBy.FieldDataType,
|
|
})
|
|
}
|
|
|
|
for idx := range query.SelectFields {
|
|
selectField := query.SelectFields[idx]
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
|
Name: selectField.Name,
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: selectField.FieldContext,
|
|
FieldDataType: selectField.FieldDataType,
|
|
})
|
|
}
|
|
|
|
for idx := range query.Order {
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
|
Name: query.Order[idx].Key.Name,
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: query.Order[idx].Key.FieldContext,
|
|
FieldDataType: query.Order[idx].Key.FieldDataType,
|
|
})
|
|
}
|
|
|
|
for idx := range keySelectors {
|
|
keySelectors[idx].Signal = telemetrytypes.SignalLogs
|
|
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
|
}
|
|
|
|
// When the new JSON body experience is enabled, warn the user if they use the bare
|
|
// "body" key in the filter — queries on plain "body" default to body.message:string.
|
|
// TODO(Piyush): Setup better for coming FTS support.
|
|
if querybuilder.BodyJSONQueryEnabled {
|
|
for _, sel := range keySelectors {
|
|
if sel.Name == LogsV2BodyColumn {
|
|
warnings = append(warnings, bodySearchDefaultWarning)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return keySelectors, warnings
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
|
|
|
|
// Always ensure timestamp and id are present in keys map
|
|
keys["id"] = append([]*telemetrytypes.TelemetryFieldKey{{
|
|
Name: "id",
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: telemetrytypes.FieldContextLog,
|
|
FieldDataType: telemetrytypes.FieldDataTypeString,
|
|
}}, keys["id"]...)
|
|
|
|
keys["timestamp"] = append([]*telemetrytypes.TelemetryFieldKey{{
|
|
Name: "timestamp",
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: telemetrytypes.FieldContextLog,
|
|
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
|
}}, keys["timestamp"]...)
|
|
|
|
/*
|
|
Adjust keys for alias expressions in aggregations
|
|
*/
|
|
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
|
|
|
|
/*
|
|
Check if user is using multiple contexts or data types for same field name
|
|
Idea is to use a super set of keys that can satisfy all the usages
|
|
|
|
For example, lets consider model_id exists in both attributes and resources
|
|
And user is trying to use `attribute.model_id` and `model_id`.
|
|
|
|
In this case, we'll remove the context from `attribute.model_id`
|
|
and make it just `model_id` and remove the duplicate entry.
|
|
|
|
Same goes with data types.
|
|
Consider user is using http.status_code:number and http.status_code
|
|
In this case, we'll remove the data type from http.status_code:number
|
|
and make it just http.status_code and remove the duplicate entry.
|
|
*/
|
|
|
|
actions = append(actions, querybuilder.AdjustDuplicateKeys(&query)...)
|
|
|
|
/*
|
|
Now adjust each key to have correct context and data type
|
|
Here we try to make intelligent guesses which work for all users (not just majority)
|
|
Reason for doing this is to not create an unexpected behavior for users
|
|
*/
|
|
for idx := range query.SelectFields {
|
|
actions = append(actions, b.adjustKey(&query.SelectFields[idx], keys)...)
|
|
}
|
|
for idx := range query.GroupBy {
|
|
actions = append(actions, b.adjustKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
|
|
}
|
|
for idx := range query.Order {
|
|
actions = append(actions, b.adjustKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
|
|
}
|
|
|
|
for _, action := range actions {
|
|
// TODO: change to debug level once we are confident about the behavior
|
|
b.logger.InfoContext(ctx, "key adjustment action", "action", action)
|
|
}
|
|
|
|
return query
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
|
|
// First check if it matches with any intrinsic fields
|
|
var intrinsicOrCalculatedField telemetrytypes.TelemetryFieldKey
|
|
if _, ok := IntrinsicFields[key.Name]; ok {
|
|
intrinsicOrCalculatedField = IntrinsicFields[key.Name]
|
|
return querybuilder.AdjustKey(key, keys, &intrinsicOrCalculatedField)
|
|
}
|
|
|
|
return querybuilder.AdjustKey(key, keys, nil)
|
|
}
|
|
|
|
// buildListQuery builds a query for list panel type
|
|
func (b *logQueryStatementBuilder) buildListQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
// Select timestamp and id by default
|
|
sb.Select(LogsV2TimestampColumn)
|
|
sb.SelectMore(LogsV2IDColumn)
|
|
if len(query.SelectFields) == 0 {
|
|
// Select all default columns
|
|
sb.SelectMore(LogsV2TraceIDColumn)
|
|
sb.SelectMore(LogsV2SpanIDColumn)
|
|
sb.SelectMore(LogsV2TraceFlagsColumn)
|
|
sb.SelectMore(LogsV2SeverityTextColumn)
|
|
sb.SelectMore(LogsV2SeverityNumberColumn)
|
|
sb.SelectMore(LogsV2ScopeNameColumn)
|
|
sb.SelectMore(LogsV2ScopeVersionColumn)
|
|
sb.SelectMore(bodyAliasExpression())
|
|
sb.SelectMore(LogsV2AttributesStringColumn)
|
|
sb.SelectMore(LogsV2AttributesNumberColumn)
|
|
sb.SelectMore(LogsV2AttributesBoolColumn)
|
|
sb.SelectMore(LogsV2ResourcesStringColumn)
|
|
sb.SelectMore(LogsV2ScopeStringColumn)
|
|
|
|
} else {
|
|
// Select specified columns
|
|
for index := range query.SelectFields {
|
|
if query.SelectFields[index].Name == LogsV2TimestampColumn || query.SelectFields[index].Name == LogsV2IDColumn {
|
|
continue
|
|
}
|
|
|
|
// get column expression for the field - use array index directly to avoid pointer to loop variable
|
|
colExpr, err := b.fm.ColumnExpressionFor(ctx, &query.SelectFields[index], keys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sb.SelectMore(colExpr)
|
|
}
|
|
}
|
|
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
|
|
// Add filter conditions
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Add order by
|
|
for _, orderBy := range query.Order {
|
|
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
|
|
}
|
|
|
|
// Add limit and offset
|
|
if query.Limit > 0 {
|
|
sb.Limit(query.Limit)
|
|
} else {
|
|
sb.Limit(100)
|
|
}
|
|
|
|
if query.Offset > 0 {
|
|
sb.Offset(query.Offset)
|
|
}
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
sb.SelectMore(fmt.Sprintf(
|
|
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
|
|
int64(query.StepInterval.Seconds()),
|
|
))
|
|
|
|
var allGroupByArgs []any
|
|
|
|
// Keep original column expressions so we can build the tuple
|
|
fieldNames := make([]string, 0, len(query.GroupBy))
|
|
for _, gb := range query.GroupBy {
|
|
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
|
|
allGroupByArgs = append(allGroupByArgs, args...)
|
|
sb.SelectMore(colExpr)
|
|
fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name))
|
|
}
|
|
|
|
// Aggregations
|
|
allAggChArgs := make([]any, 0)
|
|
for i, agg := range query.Aggregations {
|
|
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
|
ctx, agg.Expression,
|
|
uint64(query.StepInterval.Seconds()),
|
|
keys,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
allAggChArgs = append(allAggChArgs, chArgs...)
|
|
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
|
|
}
|
|
|
|
// Add FROM clause
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var finalSQL string
|
|
var finalArgs []any
|
|
|
|
if query.Limit > 0 && len(query.GroupBy) > 0 {
|
|
// build the scalar “top/bottom-N” query in its own builder.
|
|
cteSB := sqlbuilder.NewSelectBuilder()
|
|
cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true, variables)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query))
|
|
cteArgs = append(cteArgs, cteStmt.Args)
|
|
|
|
// Constrain the main query to the rows that appear in the CTE.
|
|
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
|
|
sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
|
|
|
|
// Group by all dimensions
|
|
sb.GroupBy("ts")
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
// Rewrite having expression to use SQL column names
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
if len(query.Order) != 0 {
|
|
for _, orderBy := range query.Order {
|
|
_, ok := aggOrderBy(orderBy, query)
|
|
if !ok {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
sb.OrderBy("ts desc")
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
// Stitch it all together: WITH … SELECT …
|
|
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
} else {
|
|
sb.GroupBy("ts")
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
if len(query.Order) != 0 {
|
|
for _, orderBy := range query.Order {
|
|
_, ok := aggOrderBy(orderBy, query)
|
|
if !ok {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
sb.OrderBy("ts desc")
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
// Stitch it all together: WITH … SELECT …
|
|
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
}
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
// buildScalarQuery builds a query for scalar panel type
|
|
func (b *logQueryStatementBuilder) buildScalarQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
skipResourceCTE bool,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" && !skipResourceCTE {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
allAggChArgs := []any{}
|
|
|
|
var allGroupByArgs []any
|
|
|
|
for _, gb := range query.GroupBy {
|
|
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
|
|
allGroupByArgs = append(allGroupByArgs, args...)
|
|
sb.SelectMore(colExpr)
|
|
}
|
|
|
|
// for scalar queries, the rate would be end-start
|
|
rateInterval := (end - start) / querybuilder.NsToSeconds
|
|
|
|
// Add aggregation
|
|
if len(query.Aggregations) > 0 {
|
|
for idx := range query.Aggregations {
|
|
aggExpr := query.Aggregations[idx]
|
|
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
|
ctx, aggExpr.Expression,
|
|
rateInterval,
|
|
keys,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
allAggChArgs = append(allAggChArgs, chArgs...)
|
|
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx))
|
|
}
|
|
}
|
|
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
|
|
// Add filter conditions
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Group by dimensions
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
|
|
// Add having clause if needed
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
// Add order by
|
|
for _, orderBy := range query.Order {
|
|
idx, ok := aggOrderBy(orderBy, query)
|
|
if ok {
|
|
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue()))
|
|
} else {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
|
|
// if there is no order by, then use the __result_0 as the order by
|
|
if len(query.Order) == 0 {
|
|
sb.OrderBy("__result_0 DESC")
|
|
}
|
|
|
|
// Add limit and offset
|
|
if query.Limit > 0 {
|
|
sb.Limit(query.Limit)
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
// buildFilterCondition builds SQL condition from filter expression
|
|
func (b *logQueryStatementBuilder) addFilterCondition(
|
|
_ context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
start, end uint64,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*querybuilder.PreparedWhereClause, error) {
|
|
|
|
var preparedWhereClause *querybuilder.PreparedWhereClause
|
|
var err error
|
|
|
|
if query.Filter != nil && query.Filter.Expression != "" {
|
|
// add filter expression
|
|
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
|
Logger: b.logger,
|
|
FieldMapper: b.fm,
|
|
ConditionBuilder: b.cb,
|
|
FieldKeys: keys,
|
|
SkipResourceFilter: true,
|
|
FullTextColumn: b.fullTextColumn,
|
|
JsonKeyToKey: b.jsonKeyToKey,
|
|
Variables: variables,
|
|
}, start, end)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if preparedWhereClause != nil {
|
|
sb.AddWhereClause(preparedWhereClause.WhereClause)
|
|
}
|
|
|
|
// add time filter
|
|
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
|
|
var endBucket uint64
|
|
if end != 0 {
|
|
endBucket = end / querybuilder.NsToSeconds
|
|
}
|
|
|
|
if start != 0 {
|
|
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.GE("ts_bucket_start", startBucket))
|
|
}
|
|
if end != 0 {
|
|
sb.Where(sb.L("timestamp", fmt.Sprintf("%d", end)), sb.LE("ts_bucket_start", endBucket))
|
|
}
|
|
|
|
return preparedWhereClause, nil
|
|
}
|
|
|
|
func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) {
|
|
for i, agg := range q.Aggregations {
|
|
if k.Key.Name == agg.Alias ||
|
|
k.Key.Name == agg.Expression ||
|
|
k.Key.Name == fmt.Sprintf("%d", i) {
|
|
return i, true
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) maybeAttachResourceFilter(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (cteSQL string, cteArgs []any, err error) {
|
|
|
|
stmt, err := b.buildResourceFilterCTE(ctx, query, start, end, variables)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
|
|
|
|
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) buildResourceFilterCTE(
|
|
ctx context.Context,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
return b.resourceFilterStmtBuilder.Build(
|
|
ctx,
|
|
start,
|
|
end,
|
|
qbtypes.RequestTypeRaw,
|
|
query,
|
|
variables,
|
|
)
|
|
}
|