mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-26 22:00:23 +01:00
* feat(audit): add telemetry audit query infrastructure Add pkg/telemetryaudit/ with tables, field mapper, condition builder, and statement builder for querying audit logs from signoz_audit database. Add SourceAudit to source enum and integrate audit key resolution into the metadata store. * chore: address review comments Comment out SourceAudit from Enum() until frontend is ready. Use actual audit table constants in metadata test helpers. * fix(audit): align field mapper with actual audit DDL schema Remove resources_string (not in audit table DDL). Add event_name as intrinsic column. Resource context resolves only through the resource JSON column. * feat(audit): add audit field value autocomplete support Wire distributed_tag_attributes_v2 for signoz_audit into the metadata store. Add getAuditFieldValues() and route SignalLogs + SourceAudit to it in GetFieldValues(). * test(audit): add statement builder tests Cover all three request types (list, time series, scalar) with audit-specific query patterns: materialized column filters, AND/OR conditions, limit CTEs, and group-by expressions. * refactor(audit): inline field key map into test file Remove test_data.go and inline the audit field key map directly into statement_builder_test.go with a compact helper function. * style(audit): move column map to const.go, use sqlbuilder.As in metadata Move logsV2Columns from field_mapper.go to const.go to colocate all column definitions. Switch getAuditKeys() to use sb.As() instead of raw string formatting. Fix FieldContext alignment. * fix(audit): align table names with schema migration Migration uses logs/distributed_logs (not logs_v2/distributed_logs_v2). Rename LogsV2TableName to LogsTableName and LogsV2LocalTableName to LogsLocalTableName to match the actual signoz_audit DDL. * feat(audit): add integration test fixture for audit logs AuditLog fixture inserts into all 5 signoz_audit tables matching the schema migration DDL: distributed_logs (no resources_string, has event_name), distributed_logs_resource, distributed_tag_attributes_v2, distributed_logs_attribute_keys, distributed_logs_resource_keys. * fix(audit): rename tag_attributes_v2 to tag_attributes Migration uses tag_attributes/distributed_tag_attributes (no _v2 suffix). Rename constants and update all references including the integration test fixture. * feat(audit): wire audit statement builder into querier Add auditStmtBuilder to querier struct and route LogAggregation queries with source=audit to it in all three dispatch locations (main query, live tail, shiftedQuery). Create and wire the full audit query stack in signozquerier provider. * test(audit): add integration tests for audit log querying Cover the documented query patterns: list all events, filter by principal ID, filter by outcome, filter by resource name+ID, filter by principal type, scalar count for alerting, and isolation test ensuring audit data doesn't leak into regular logs. * fix(audit): revert sb.As in getAuditKeys, fix fixture column_names Revert getAuditKeys to use raw SQL strings instead of sb.As() which incorrectly treated string literals as column references. Add explicit column_names to all ClickHouse insert calls in the audit fixture. * fix(audit): remove debug assertion from integration test * feat(audit): internalize resource filter in audit statement builder Build the resource filter internally pointing at signoz_audit.distributed_logs_resource. Add LogsResourceTableName constant. Remove resourceFilterStmtBuilder from constructor params. Update test expectations to use the audit resource table. * fix(audit): rename resource.name to resource.kind, move to resource attributes Align with schema change from SigNoz/signoz#10826: - signoz.audit.resource.name renamed to signoz.audit.resource.kind - resource.kind and resource.id moved from event attributes to OTel Resource attributes (resource JSON column) - Materialized columns reduced from 7 to 5 (resource.kind and resource.id no longer materialized) * refactor(audit): use pytest.mark.parametrize for filter integration tests Consolidate filter test functions into a single parametrized test. 6/8 tests passing; resource kind+ID filter and scalar count need further investigation (resource filter JSON key extraction with dotted keys, scalar response format). * fix(audit): add source to resource filter for correct metadata routing Add source param to telemetryresourcefilter.New so the resource filter's key selectors include Source when calling GetKeysMulti. Without this, audit resource keys route to signoz_logs metadata tables instead of signoz_audit. Fix scalar test to use table response format (columns+data, not rows). * refactor(audit): reuse querier fixtures in integration tests Add source param to BuilderQuery and build_scalar_query in the querier fixture. Replace custom _build_audit_query and _build_audit_ts_query helpers with BuilderQuery and build_scalar_query from the shared fixtures. * refactor(audit): remove wrapper helpers, inline make_query_request calls Remove _query_audit_raw and _query_audit_scalar helpers. Use make_query_request, BuilderQuery, and build_scalar_query directly. Compute time window at test execution time via _time_window() to avoid stale module-level timestamps. * refactor(audit): inline _time_window into test functions * style(audit): use snake_case for pytest parametrize IDs * refactor(audit): inline DEFAULT_ORDER using build_order_by Use build_order_by from querier fixtures instead of OrderBy/ TelemetryFieldKey dataclasses. Allow BuilderQuery.order to accept plain dicts alongside OrderBy objects. * refactor(audit): inline all data setup, use distinct scenarios per test Remove _insert_standard_audit_events helper. Each test now owns its data: list_all uses alert-rule/saved-view/user resource types, scalar_count uses multiple failures from different principals (count=2), leak test uses a single organization event. Parametrized filter tests keep the original 5-event dataset. * fix(audit): remove silent empty-string guards in metadata store Remove guards that silently returned nil/empty when audit DB params were empty. All call sites now pass real constants, so misconfiguration should fail loudly rather than produce silent empty results. * style(audit): remove module docstring from integration test * style: formatting fix in tables file * style: formatting fix in tables file * fix: add auditStmtBuilder nil param to querier_test.go * fix: fix fmt
1092 lines
38 KiB
Go
1092 lines
38 KiB
Go
package querier
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
gomaps "maps"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"golang.org/x/exp/maps"
|
|
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
|
"github.com/SigNoz/signoz/pkg/prometheus"
|
|
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
|
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
|
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
|
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/valuer"
|
|
)
|
|
|
|
var (
|
|
intervalWarn = "Query %s is requesting aggregation interval %v seconds, which is smaller than the minimum allowed interval of %v seconds for selected time range. Using the minimum instead"
|
|
)
|
|
|
|
type querier struct {
|
|
logger *slog.Logger
|
|
telemetryStore telemetrystore.TelemetryStore
|
|
metadataStore telemetrytypes.MetadataStore
|
|
promEngine prometheus.Prometheus
|
|
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
|
|
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
|
auditStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
|
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
|
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
|
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder
|
|
bucketCache BucketCache
|
|
liveDataRefresh time.Duration
|
|
}
|
|
|
|
var _ Querier = (*querier)(nil)
|
|
|
|
func New(
|
|
settings factory.ProviderSettings,
|
|
telemetryStore telemetrystore.TelemetryStore,
|
|
metadataStore telemetrytypes.MetadataStore,
|
|
promEngine prometheus.Prometheus,
|
|
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
|
|
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
|
auditStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
|
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
|
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
|
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
|
|
bucketCache BucketCache,
|
|
) *querier {
|
|
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
|
|
return &querier{
|
|
logger: querierSettings.Logger(),
|
|
telemetryStore: telemetryStore,
|
|
metadataStore: metadataStore,
|
|
promEngine: promEngine,
|
|
traceStmtBuilder: traceStmtBuilder,
|
|
logStmtBuilder: logStmtBuilder,
|
|
auditStmtBuilder: auditStmtBuilder,
|
|
metricStmtBuilder: metricStmtBuilder,
|
|
meterStmtBuilder: meterStmtBuilder,
|
|
traceOperatorStmtBuilder: traceOperatorStmtBuilder,
|
|
bucketCache: bucketCache,
|
|
liveDataRefresh: 5 * time.Second,
|
|
}
|
|
}
|
|
|
|
// extractShiftFromBuilderQuery extracts the shift value from timeShift function if present.
|
|
func extractShiftFromBuilderQuery[T any](spec qbtypes.QueryBuilderQuery[T]) int64 {
|
|
for _, fn := range spec.Functions {
|
|
if fn.Name == qbtypes.FunctionNameTimeShift && len(fn.Args) > 0 {
|
|
switch v := fn.Args[0].Value.(type) {
|
|
case float64:
|
|
return int64(v)
|
|
case int64:
|
|
return v
|
|
case int:
|
|
return int64(v)
|
|
case string:
|
|
if shiftFloat, err := strconv.ParseFloat(v, 64); err == nil {
|
|
return int64(shiftFloat)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// adjustTimeRangeForShift adjusts the time range based on the shift value from timeShift function.
|
|
func adjustTimeRangeForShift[T any](spec qbtypes.QueryBuilderQuery[T], tr qbtypes.TimeRange, kind qbtypes.RequestType) qbtypes.TimeRange {
|
|
// Only apply time shift for time series and scalar queries
|
|
// Raw/list queries don't support timeshift
|
|
if kind != qbtypes.RequestTypeTimeSeries && kind != qbtypes.RequestTypeScalar {
|
|
return tr
|
|
}
|
|
|
|
// Use the ShiftBy field if it's already populated, otherwise extract it
|
|
shiftBy := spec.ShiftBy
|
|
if shiftBy == 0 {
|
|
shiftBy = extractShiftFromBuilderQuery(spec)
|
|
}
|
|
|
|
if shiftBy == 0 {
|
|
return tr
|
|
}
|
|
|
|
// ShiftBy is in seconds, convert to milliseconds and shift backward in time
|
|
shiftMS := shiftBy * 1000
|
|
return qbtypes.TimeRange{
|
|
From: tr.From - uint64(shiftMS),
|
|
To: tr.To - uint64(shiftMS),
|
|
}
|
|
}
|
|
|
|
func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
|
|
|
|
tmplVars := req.Variables
|
|
if tmplVars == nil {
|
|
tmplVars = make(map[string]qbtypes.VariableItem)
|
|
}
|
|
event := &qbtypes.QBEvent{
|
|
Version: "v5",
|
|
NumberOfQueries: len(req.CompositeQuery.Queries),
|
|
PanelType: req.RequestType.StringValue(),
|
|
}
|
|
intervalWarnings := []string{}
|
|
|
|
dependencyQueries := make(map[string]bool)
|
|
traceOperatorQueries := make(map[string]qbtypes.QueryBuilderTraceOperator)
|
|
|
|
for _, query := range req.CompositeQuery.Queries {
|
|
if query.Type == qbtypes.QueryTypeTraceOperator {
|
|
if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
|
|
// Parse expression to find dependencies
|
|
if err := spec.ParseExpression(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
deps := spec.CollectReferencedQueries(spec.ParsedExpression)
|
|
for _, dep := range deps {
|
|
dependencyQueries[dep] = true
|
|
}
|
|
traceOperatorQueries[spec.Name] = spec
|
|
}
|
|
}
|
|
}
|
|
|
|
// First pass: collect all metric names that need temporality
|
|
metricNames := make([]string, 0)
|
|
for idx, query := range req.CompositeQuery.Queries {
|
|
event.QueryType = query.Type.StringValue()
|
|
switch query.Type {
|
|
case qbtypes.QueryTypeBuilder:
|
|
if spec, ok := query.Spec.(qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]); ok {
|
|
for _, agg := range spec.Aggregations {
|
|
if agg.MetricName != "" {
|
|
metricNames = append(metricNames, agg.MetricName)
|
|
}
|
|
}
|
|
}
|
|
// if step interval is not set, we set it ourselves with recommended value
|
|
// if step interval is set to value which could result in points more than
|
|
// allowed, we override it.
|
|
switch spec := query.Spec.(type) {
|
|
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
|
event.TracesUsed = true
|
|
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
|
|
event.GroupByApplied = len(spec.GroupBy) > 0
|
|
if spec.StepInterval.Seconds() == 0 {
|
|
spec.StepInterval = qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End)),
|
|
}
|
|
}
|
|
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
|
|
newStep := qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End)),
|
|
}
|
|
intervalWarnings = append(intervalWarnings, fmt.Sprintf(intervalWarn, spec.Name, spec.StepInterval.Seconds(), newStep.Seconds()))
|
|
spec.StepInterval = newStep
|
|
}
|
|
req.CompositeQuery.Queries[idx].Spec = spec
|
|
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
|
event.LogsUsed = true
|
|
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
|
|
event.GroupByApplied = len(spec.GroupBy) > 0
|
|
if spec.StepInterval.Seconds() == 0 {
|
|
spec.StepInterval = qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End)),
|
|
}
|
|
}
|
|
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
|
|
newStep := qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End)),
|
|
}
|
|
intervalWarnings = append(intervalWarnings, fmt.Sprintf(intervalWarn, spec.Name, spec.StepInterval.Seconds(), newStep.Seconds()))
|
|
spec.StepInterval = newStep
|
|
}
|
|
req.CompositeQuery.Queries[idx].Spec = spec
|
|
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
|
event.MetricsUsed = true
|
|
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
|
|
event.GroupByApplied = len(spec.GroupBy) > 0
|
|
|
|
if spec.Source == telemetrytypes.SourceMeter {
|
|
if spec.StepInterval.Seconds() == 0 {
|
|
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))}
|
|
}
|
|
|
|
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMeter(req.Start, req.End)) {
|
|
newStep := qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMeter(req.Start, req.End)),
|
|
}
|
|
spec.StepInterval = newStep
|
|
}
|
|
} else {
|
|
if spec.StepInterval.Seconds() == 0 {
|
|
spec.StepInterval = qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
|
|
}
|
|
}
|
|
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
|
|
newStep := qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
|
|
}
|
|
intervalWarnings = append(intervalWarnings, fmt.Sprintf(intervalWarn, spec.Name, spec.StepInterval.Seconds(), newStep.Seconds()))
|
|
spec.StepInterval = newStep
|
|
}
|
|
}
|
|
req.CompositeQuery.Queries[idx].Spec = spec
|
|
}
|
|
case qbtypes.QueryTypePromQL:
|
|
event.MetricsUsed = true
|
|
switch spec := query.Spec.(type) {
|
|
case qbtypes.PromQuery:
|
|
if spec.Step.Seconds() == 0 {
|
|
spec.Step = qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
|
|
}
|
|
}
|
|
req.CompositeQuery.Queries[idx].Spec = spec
|
|
}
|
|
case qbtypes.QueryTypeClickHouseSQL:
|
|
switch spec := query.Spec.(type) {
|
|
case qbtypes.ClickHouseQuery:
|
|
if strings.TrimSpace(spec.Query) != "" {
|
|
event.MetricsUsed = strings.Contains(spec.Query, "signoz_metrics")
|
|
event.LogsUsed = strings.Contains(spec.Query, "signoz_logs")
|
|
event.TracesUsed = strings.Contains(spec.Query, "signoz_traces")
|
|
}
|
|
}
|
|
case qbtypes.QueryTypeTraceOperator:
|
|
if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
|
|
if spec.StepInterval.Seconds() == 0 {
|
|
spec.StepInterval = qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End)),
|
|
}
|
|
}
|
|
|
|
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
|
|
newStep := qbtypes.Step{
|
|
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End)),
|
|
}
|
|
intervalWarnings = append(intervalWarnings, fmt.Sprintf(intervalWarn, spec.Name, spec.StepInterval.Seconds(), newStep.Seconds()))
|
|
spec.StepInterval = newStep
|
|
}
|
|
req.CompositeQuery.Queries[idx].Spec = spec
|
|
}
|
|
}
|
|
}
|
|
|
|
queries := make(map[string]qbtypes.Query)
|
|
steps := make(map[string]qbtypes.Step)
|
|
missingMetrics := []string{}
|
|
missingMetricQueries := []string{}
|
|
|
|
for _, query := range req.CompositeQuery.Queries {
|
|
var queryName string
|
|
var isTraceOperator bool
|
|
|
|
switch query.Type {
|
|
case qbtypes.QueryTypeTraceOperator:
|
|
if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
|
|
queryName = spec.Name
|
|
isTraceOperator = true
|
|
}
|
|
case qbtypes.QueryTypePromQL:
|
|
if spec, ok := query.Spec.(qbtypes.PromQuery); ok {
|
|
queryName = spec.Name
|
|
}
|
|
case qbtypes.QueryTypeClickHouseSQL:
|
|
if spec, ok := query.Spec.(qbtypes.ClickHouseQuery); ok {
|
|
queryName = spec.Name
|
|
}
|
|
case qbtypes.QueryTypeBuilder:
|
|
switch spec := query.Spec.(type) {
|
|
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
|
queryName = spec.Name
|
|
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
|
queryName = spec.Name
|
|
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
|
queryName = spec.Name
|
|
}
|
|
}
|
|
|
|
if !isTraceOperator && dependencyQueries[queryName] {
|
|
continue
|
|
}
|
|
|
|
switch query.Type {
|
|
case qbtypes.QueryTypePromQL:
|
|
promQuery, ok := query.Spec.(qbtypes.PromQuery)
|
|
if !ok {
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", query.Spec)
|
|
}
|
|
promqlQuery := newPromqlQuery(q.logger, q.promEngine, promQuery, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType, tmplVars)
|
|
queries[promQuery.Name] = promqlQuery
|
|
steps[promQuery.Name] = promQuery.Step
|
|
case qbtypes.QueryTypeClickHouseSQL:
|
|
chQuery, ok := query.Spec.(qbtypes.ClickHouseQuery)
|
|
if !ok {
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid clickhouse query spec %T", query.Spec)
|
|
}
|
|
chSQLQuery := newchSQLQuery(q.logger, q.telemetryStore, chQuery, nil, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType, tmplVars)
|
|
queries[chQuery.Name] = chSQLQuery
|
|
case qbtypes.QueryTypeTraceOperator:
|
|
traceOpQuery, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator)
|
|
if !ok {
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid trace operator query spec %T", query.Spec)
|
|
}
|
|
toq := &traceOperatorQuery{
|
|
telemetryStore: q.telemetryStore,
|
|
stmtBuilder: q.traceOperatorStmtBuilder,
|
|
spec: traceOpQuery,
|
|
compositeQuery: &req.CompositeQuery,
|
|
fromMS: uint64(req.Start),
|
|
toMS: uint64(req.End),
|
|
kind: req.RequestType,
|
|
}
|
|
queries[traceOpQuery.Name] = toq
|
|
steps[traceOpQuery.Name] = traceOpQuery.StepInterval
|
|
case qbtypes.QueryTypeBuilder:
|
|
switch spec := query.Spec.(type) {
|
|
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
|
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
|
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
|
bq := newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
|
queries[spec.Name] = bq
|
|
steps[spec.Name] = spec.StepInterval
|
|
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
|
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
|
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
|
stmtBuilder := q.logStmtBuilder
|
|
if spec.Source == telemetrytypes.SourceAudit {
|
|
stmtBuilder = q.auditStmtBuilder
|
|
}
|
|
bq := newBuilderQuery(q.logger, q.telemetryStore, stmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
|
queries[spec.Name] = bq
|
|
steps[spec.Name] = spec.StepInterval
|
|
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
|
var metricTemporality map[string]metrictypes.Temporality
|
|
var metricTypes map[string]metrictypes.Type
|
|
if len(metricNames) > 0 {
|
|
var err error
|
|
metricTemporality, metricTypes, err = q.metadataStore.FetchTemporalityAndTypeMulti(ctx, req.Start, req.End, metricNames...)
|
|
if err != nil {
|
|
q.logger.WarnContext(ctx, "failed to fetch metric temporality", errors.Attr(err), slog.Any("metrics", metricNames))
|
|
return nil, errors.NewInternalf(errors.CodeInternal, "failed to fetch metrics temporality")
|
|
}
|
|
q.logger.DebugContext(ctx, "fetched metric temporalities and types", slog.Any("metric_temporality", metricTemporality), slog.Any("metric_types", metricTypes))
|
|
}
|
|
presentAggregations := []qbtypes.MetricAggregation{}
|
|
for i := range spec.Aggregations {
|
|
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Temporality == metrictypes.Unknown {
|
|
if temp, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && temp != metrictypes.Unknown {
|
|
spec.Aggregations[i].Temporality = temp
|
|
}
|
|
}
|
|
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
|
|
if foundMetricType, ok := metricTypes[spec.Aggregations[i].MetricName]; ok && foundMetricType != metrictypes.UnspecifiedType {
|
|
spec.Aggregations[i].Type = foundMetricType
|
|
}
|
|
}
|
|
if spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
|
|
missingMetrics = append(missingMetrics, spec.Aggregations[i].MetricName)
|
|
continue
|
|
}
|
|
presentAggregations = append(presentAggregations, spec.Aggregations[i])
|
|
}
|
|
if len(presentAggregations) == 0 {
|
|
missingMetricQueries = append(missingMetricQueries, spec.Name)
|
|
continue
|
|
}
|
|
spec.Aggregations = presentAggregations
|
|
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
|
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
|
var bq *builderQuery[qbtypes.MetricAggregation]
|
|
|
|
if spec.Source == telemetrytypes.SourceMeter {
|
|
event.Source = telemetrytypes.SourceMeter.StringValue()
|
|
bq = newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
|
} else {
|
|
bq = newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
|
}
|
|
|
|
queries[spec.Name] = bq
|
|
steps[spec.Name] = spec.StepInterval
|
|
default:
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported builder spec type %T", query.Spec)
|
|
}
|
|
}
|
|
}
|
|
nonExistentMetrics := []string{}
|
|
var dormantMetricsWarningMsg string
|
|
if len(missingMetrics) > 0 {
|
|
lastSeenInfo, _ := q.metadataStore.FetchLastSeenInfoMulti(ctx, missingMetrics...)
|
|
for _, missingMetricName := range missingMetrics {
|
|
if ts, ok := lastSeenInfo[missingMetricName]; ok && ts > 0 {
|
|
continue
|
|
}
|
|
nonExistentMetrics = append(nonExistentMetrics, missingMetricName)
|
|
}
|
|
if len(nonExistentMetrics) == 1 {
|
|
return nil, errors.NewNotFoundf(errors.CodeNotFound, "could not find the metric %s", nonExistentMetrics[0])
|
|
} else if len(nonExistentMetrics) > 1 {
|
|
return nil, errors.NewNotFoundf(errors.CodeNotFound, "the following metrics were not found: %s", strings.Join(nonExistentMetrics, ", "))
|
|
}
|
|
lastSeenStr := func(name string) string {
|
|
if ts, ok := lastSeenInfo[name]; ok && ts > 0 {
|
|
ago := humanize.RelTime(time.UnixMilli(ts), time.Now(), "ago", "from now")
|
|
return fmt.Sprintf("%s (last seen %s)", name, ago)
|
|
}
|
|
return name // this case won't come cuz lastSeenStr is never called for metrics in nonExistentMetrics
|
|
}
|
|
if len(missingMetrics) == 1 {
|
|
dormantMetricsWarningMsg = fmt.Sprintf("no data found for the metric %s in the query time range", lastSeenStr(missingMetrics[0]))
|
|
} else {
|
|
parts := make([]string, len(missingMetrics))
|
|
for i, m := range missingMetrics {
|
|
parts[i] = lastSeenStr(m)
|
|
}
|
|
dormantMetricsWarningMsg = fmt.Sprintf("no data found for the following metrics in the query time range: %s", strings.Join(parts, ", "))
|
|
}
|
|
}
|
|
preseededResults := make(map[string]any)
|
|
for _, name := range missingMetricQueries { // at this point missing metrics will not have any non existent metrics, only normal ones
|
|
switch req.RequestType {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
preseededResults[name] = &qbtypes.TimeSeriesData{QueryName: name}
|
|
case qbtypes.RequestTypeScalar:
|
|
preseededResults[name] = &qbtypes.ScalarData{QueryName: name}
|
|
case qbtypes.RequestTypeRaw:
|
|
preseededResults[name] = &qbtypes.RawData{QueryName: name}
|
|
}
|
|
}
|
|
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event, preseededResults)
|
|
if qbResp != nil {
|
|
qbResp.QBEvent = event
|
|
if len(intervalWarnings) != 0 && req.RequestType == qbtypes.RequestTypeTimeSeries {
|
|
if qbResp.Warning == nil {
|
|
qbResp.Warning = &qbtypes.QueryWarnData{
|
|
Warnings: make([]qbtypes.QueryWarnDataAdditional, len(intervalWarnings)),
|
|
}
|
|
for idx := range intervalWarnings {
|
|
qbResp.Warning.Warnings[idx] = qbtypes.QueryWarnDataAdditional{Message: intervalWarnings[idx]}
|
|
}
|
|
}
|
|
}
|
|
if dormantMetricsWarningMsg != "" {
|
|
if qbResp.Warning == nil {
|
|
qbResp.Warning = &qbtypes.QueryWarnData{}
|
|
}
|
|
qbResp.Warning.Warnings = append(qbResp.Warning.Warnings, qbtypes.QueryWarnDataAdditional{
|
|
Message: dormantMetricsWarningMsg,
|
|
})
|
|
}
|
|
}
|
|
return qbResp, qbErr
|
|
}
|
|
|
|
func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream) {
|
|
|
|
event := &qbtypes.QBEvent{
|
|
Version: "v5",
|
|
NumberOfQueries: len(req.CompositeQuery.Queries),
|
|
PanelType: req.RequestType.StringValue(),
|
|
}
|
|
|
|
for _, query := range req.CompositeQuery.Queries {
|
|
event.QueryType = query.Type.StringValue()
|
|
if query.Type == qbtypes.QueryTypeBuilder {
|
|
switch spec := query.Spec.(type) {
|
|
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
|
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
|
|
default:
|
|
// return if it's not log aggregation
|
|
client.Error <- errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported builder spec type %T", query.Spec)
|
|
return
|
|
}
|
|
} else {
|
|
// return if it's not of type query builder
|
|
client.Error <- errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported query type %s", query.Type)
|
|
return
|
|
}
|
|
}
|
|
|
|
queries := make(map[string]qbtypes.Query)
|
|
query := req.CompositeQuery.Queries[0]
|
|
spec := query.Spec.(qbtypes.QueryBuilderQuery[qbtypes.LogAggregation])
|
|
// add the new id to the id filter
|
|
if spec.Filter == nil || spec.Filter.Expression == "" {
|
|
spec.Filter = &qbtypes.Filter{Expression: "id > $id"}
|
|
} else {
|
|
spec.Filter.Expression = fmt.Sprintf("%s and id > $id", spec.Filter.Expression)
|
|
}
|
|
|
|
tsStart := req.Start
|
|
if tsStart == 0 {
|
|
tsStart = uint64(time.Now().UnixNano())
|
|
} else {
|
|
tsStart = uint64(utils.GetEpochNanoSecs(int64(tsStart)))
|
|
}
|
|
updatedLogID := ""
|
|
|
|
ticker := time.NewTicker(q.liveDataRefresh)
|
|
defer ticker.Stop()
|
|
|
|
// we are creating a custom ticker wrapper to trigger it instantly
|
|
tick := make(chan time.Time, 1)
|
|
tick <- time.Now() // initial tick
|
|
go func() {
|
|
for t := range ticker.C {
|
|
tick <- t
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
done := true
|
|
client.Done <- &done
|
|
return
|
|
case <-tick:
|
|
// timestamp end is not specified here
|
|
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: tsStart}, req.RequestType)
|
|
liveTailStmtBuilder := q.logStmtBuilder
|
|
if spec.Source == telemetrytypes.SourceAudit {
|
|
liveTailStmtBuilder = q.auditStmtBuilder
|
|
}
|
|
bq := newBuilderQuery(q.logger, q.telemetryStore, liveTailStmtBuilder, spec, timeRange, req.RequestType, map[string]qbtypes.VariableItem{
|
|
"id": {
|
|
Value: updatedLogID,
|
|
},
|
|
})
|
|
queries[spec.Name] = bq
|
|
|
|
qbResp, qbErr := q.run(ctx, orgID, queries, req, nil, event, nil)
|
|
if qbErr != nil {
|
|
client.Error <- qbErr
|
|
return
|
|
}
|
|
|
|
if qbResp == nil || len(qbResp.Data.Results) == 0 || qbResp.Data.Results[0] == nil {
|
|
continue
|
|
}
|
|
data := qbResp.Data.Results[0].(*qbtypes.RawData)
|
|
for i := len(data.Rows) - 1; i >= 0; i-- {
|
|
client.Logs <- data.Rows[i]
|
|
if i == 0 {
|
|
tsStart = uint64(data.Rows[i].Timestamp.UnixNano())
|
|
updatedLogID = data.Rows[i].Data["id"].(string)
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *querier) run(
|
|
ctx context.Context,
|
|
orgID valuer.UUID,
|
|
qs map[string]qbtypes.Query,
|
|
req *qbtypes.QueryRangeRequest,
|
|
steps map[string]qbtypes.Step,
|
|
qbEvent *qbtypes.QBEvent,
|
|
preseededResults map[string]any,
|
|
) (*qbtypes.QueryRangeResponse, error) {
|
|
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
|
instrumentationtypes.PanelType: qbEvent.PanelType,
|
|
instrumentationtypes.QueryType: qbEvent.QueryType,
|
|
})
|
|
|
|
results := make(map[string]any)
|
|
warnings := make([]string, 0)
|
|
warningsDocURL := ""
|
|
stats := qbtypes.ExecStats{}
|
|
|
|
hasData := func(result *qbtypes.Result) bool {
|
|
if result == nil || result.Value == nil {
|
|
return false
|
|
}
|
|
switch result.Type {
|
|
case qbtypes.RequestTypeScalar:
|
|
if val, ok := result.Value.(*qbtypes.ScalarData); ok && val != nil {
|
|
return len(val.Data) != 0
|
|
}
|
|
case qbtypes.RequestTypeRaw:
|
|
if val, ok := result.Value.(*qbtypes.RawData); ok && val != nil {
|
|
return len(val.Rows) != 0
|
|
}
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
if val, ok := result.Value.(*qbtypes.TimeSeriesData); ok && val != nil {
|
|
if len(val.Aggregations) != 0 {
|
|
anyNonEmpty := false
|
|
for _, aggBucket := range val.Aggregations {
|
|
if len(aggBucket.Series) != 0 {
|
|
anyNonEmpty = true
|
|
break
|
|
}
|
|
}
|
|
return anyNonEmpty
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
for name, query := range qs {
|
|
// Skip cache if NoCache is set, or if cache is not available
|
|
if req.NoCache || q.bucketCache == nil || query.Fingerprint() == "" {
|
|
if req.NoCache {
|
|
q.logger.DebugContext(ctx, "NoCache flag set, bypassing cache", slog.String("query", name))
|
|
} else {
|
|
q.logger.InfoContext(ctx, "no bucket cache or fingerprint, executing query", slog.String("fingerprint", query.Fingerprint()))
|
|
}
|
|
result, err := query.Execute(ctx)
|
|
qbEvent.HasData = qbEvent.HasData || hasData(result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
results[name] = result.Value
|
|
warnings = append(warnings, result.Warnings...)
|
|
warningsDocURL = result.WarningsDocURL
|
|
stats.RowsScanned += result.Stats.RowsScanned
|
|
stats.BytesScanned += result.Stats.BytesScanned
|
|
stats.DurationMS += result.Stats.DurationMS
|
|
} else {
|
|
result, err := q.executeWithCache(ctx, orgID, query, steps[name], req.NoCache)
|
|
qbEvent.HasData = qbEvent.HasData || hasData(result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
switch v := result.Value.(type) {
|
|
case *qbtypes.TimeSeriesData:
|
|
v.QueryName = name
|
|
case *qbtypes.ScalarData:
|
|
v.QueryName = name
|
|
case *qbtypes.RawData:
|
|
v.QueryName = name
|
|
}
|
|
|
|
results[name] = result.Value
|
|
warnings = append(warnings, result.Warnings...)
|
|
warningsDocURL = result.WarningsDocURL
|
|
stats.RowsScanned += result.Stats.RowsScanned
|
|
stats.BytesScanned += result.Stats.BytesScanned
|
|
stats.DurationMS += result.Stats.DurationMS
|
|
}
|
|
}
|
|
|
|
gomaps.Copy(results, preseededResults)
|
|
processedResults, err := q.postProcessResults(ctx, results, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// attach step interval to metadata so client can make informed decisions, ex: width of the bar
|
|
// or go to related logs/traces from a point in line/bar chart with correct time range
|
|
stepIntervals := make(map[string]uint64, len(steps))
|
|
for name, step := range steps {
|
|
stepIntervals[name] = uint64(step.Seconds())
|
|
}
|
|
for _, query := range req.CompositeQuery.Queries {
|
|
if query.Type == qbtypes.QueryTypeFormula {
|
|
if formula, ok := query.Spec.(qbtypes.QueryBuilderFormula); ok {
|
|
formulaStepMs := q.calculateFormulaStep(formula.Expression, req)
|
|
stepIntervals[formula.Name] = uint64(formulaStepMs / 1000) // convert ms to seconds
|
|
}
|
|
}
|
|
}
|
|
|
|
resp := &qbtypes.QueryRangeResponse{
|
|
Type: req.RequestType,
|
|
Data: qbtypes.QueryData{
|
|
Results: maps.Values(processedResults),
|
|
},
|
|
Meta: qbtypes.ExecStats{
|
|
RowsScanned: stats.RowsScanned,
|
|
BytesScanned: stats.BytesScanned,
|
|
DurationMS: stats.DurationMS,
|
|
StepIntervals: stepIntervals,
|
|
},
|
|
}
|
|
|
|
if len(warnings) != 0 {
|
|
warns := make([]qbtypes.QueryWarnDataAdditional, len(warnings))
|
|
for i, warning := range warnings {
|
|
warns[i] = qbtypes.QueryWarnDataAdditional{
|
|
Message: warning,
|
|
}
|
|
}
|
|
|
|
resp.Warning = &qbtypes.QueryWarnData{
|
|
Message: "Encountered warnings",
|
|
Url: warningsDocURL,
|
|
Warnings: warns,
|
|
}
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// executeWithCache executes a query using the bucket cache.
|
|
func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query qbtypes.Query, step qbtypes.Step, _ bool) (*qbtypes.Result, error) {
|
|
// Get cached data and missing ranges
|
|
cachedResult, missingRanges := q.bucketCache.GetMissRanges(ctx, orgID, query, step)
|
|
|
|
// If no missing ranges, return cached result
|
|
if len(missingRanges) == 0 && cachedResult != nil {
|
|
return cachedResult, nil
|
|
}
|
|
|
|
// If entire range is missing, execute normally
|
|
if cachedResult == nil && len(missingRanges) == 1 {
|
|
startMs, endMs := query.Window()
|
|
if missingRanges[0].From == startMs && missingRanges[0].To == endMs {
|
|
result, err := query.Execute(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Store in cache for future use
|
|
q.bucketCache.Put(ctx, orgID, query, step, result)
|
|
return result, nil
|
|
}
|
|
}
|
|
|
|
// Execute queries for missing ranges with bounded parallelism
|
|
freshResults := make([]*qbtypes.Result, len(missingRanges))
|
|
errs := make([]error, len(missingRanges))
|
|
totalStats := qbtypes.ExecStats{}
|
|
|
|
q.logger.DebugContext(ctx, "executing queries for missing ranges",
|
|
slog.Int("missing_ranges_count", len(missingRanges)),
|
|
slog.Any("ranges", missingRanges))
|
|
|
|
sem := make(chan struct{}, 4)
|
|
var wg sync.WaitGroup
|
|
|
|
for i, timeRange := range missingRanges {
|
|
wg.Add(1)
|
|
go func(idx int, tr *qbtypes.TimeRange) {
|
|
defer wg.Done()
|
|
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
// Create a new query with the missing time range
|
|
rangedQuery := q.createRangedQuery(query, *tr)
|
|
if rangedQuery == nil {
|
|
errs[idx] = errors.NewInternalf(errors.CodeInternal, "failed to create ranged query for range %d-%d", tr.From, tr.To)
|
|
return
|
|
}
|
|
|
|
// Execute the ranged query
|
|
result, err := rangedQuery.Execute(ctx)
|
|
if err != nil {
|
|
errs[idx] = err
|
|
return
|
|
}
|
|
|
|
freshResults[idx] = result
|
|
}(i, timeRange)
|
|
}
|
|
|
|
// Wait for all queries to complete
|
|
wg.Wait()
|
|
|
|
// Check for errors
|
|
for _, err := range errs {
|
|
if err != nil {
|
|
// If any query failed, fall back to full execution
|
|
q.logger.ErrorContext(ctx, "parallel query execution failed", errors.Attr(err))
|
|
result, err := query.Execute(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
q.bucketCache.Put(ctx, orgID, query, step, result)
|
|
return result, nil
|
|
}
|
|
}
|
|
|
|
// Calculate total stats and filter out nil results
|
|
validResults := make([]*qbtypes.Result, 0, len(freshResults))
|
|
for _, result := range freshResults {
|
|
if result != nil {
|
|
validResults = append(validResults, result)
|
|
totalStats.RowsScanned += result.Stats.RowsScanned
|
|
totalStats.BytesScanned += result.Stats.BytesScanned
|
|
totalStats.DurationMS += result.Stats.DurationMS
|
|
}
|
|
}
|
|
freshResults = validResults
|
|
|
|
// Merge cached and fresh results
|
|
mergedResult := q.mergeResults(cachedResult, freshResults)
|
|
mergedResult.Stats.RowsScanned += totalStats.RowsScanned
|
|
mergedResult.Stats.BytesScanned += totalStats.BytesScanned
|
|
mergedResult.Stats.DurationMS += totalStats.DurationMS
|
|
|
|
// Store merged result in cache
|
|
q.bucketCache.Put(ctx, orgID, query, step, mergedResult)
|
|
|
|
return mergedResult, nil
|
|
}
|
|
|
|
// createRangedQuery creates a copy of the query with a different time range.
|
|
func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtypes.TimeRange) qbtypes.Query {
|
|
// this is called in a goroutine, so we create a copy of the query to avoid race conditions
|
|
switch qt := originalQuery.(type) {
|
|
case *promqlQuery:
|
|
queryCopy := qt.query.Copy()
|
|
return newPromqlQuery(q.logger, q.promEngine, queryCopy, timeRange, qt.requestType, qt.vars)
|
|
|
|
case *chSQLQuery:
|
|
queryCopy := qt.query.Copy()
|
|
argsCopy := make([]any, len(qt.args))
|
|
copy(argsCopy, qt.args)
|
|
return newchSQLQuery(q.logger, q.telemetryStore, queryCopy, argsCopy, timeRange, qt.kind, qt.vars)
|
|
|
|
case *builderQuery[qbtypes.TraceAggregation]:
|
|
specCopy := qt.spec.Copy()
|
|
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
|
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
|
return newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
|
|
|
case *builderQuery[qbtypes.LogAggregation]:
|
|
specCopy := qt.spec.Copy()
|
|
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
|
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
|
shiftStmtBuilder := q.logStmtBuilder
|
|
if qt.spec.Source == telemetrytypes.SourceAudit {
|
|
shiftStmtBuilder = q.auditStmtBuilder
|
|
}
|
|
return newBuilderQuery(q.logger, q.telemetryStore, shiftStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
|
|
|
case *builderQuery[qbtypes.MetricAggregation]:
|
|
specCopy := qt.spec.Copy()
|
|
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
|
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
|
if qt.spec.Source == telemetrytypes.SourceMeter {
|
|
return newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
|
}
|
|
return newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
|
case *traceOperatorQuery:
|
|
specCopy := qt.spec.Copy()
|
|
return &traceOperatorQuery{
|
|
telemetryStore: q.telemetryStore,
|
|
stmtBuilder: q.traceOperatorStmtBuilder,
|
|
spec: specCopy,
|
|
fromMS: uint64(timeRange.From),
|
|
toMS: uint64(timeRange.To),
|
|
compositeQuery: qt.compositeQuery,
|
|
kind: qt.kind,
|
|
}
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// mergeResults merges cached result with fresh results.
|
|
func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result) *qbtypes.Result {
|
|
if cached == nil {
|
|
if len(fresh) == 1 {
|
|
return fresh[0]
|
|
}
|
|
if len(fresh) == 0 {
|
|
return nil
|
|
}
|
|
// If cached is nil but we have multiple fresh results, we need to merge them
|
|
// We need to merge all fresh results properly to avoid duplicates
|
|
merged := &qbtypes.Result{
|
|
Type: fresh[0].Type,
|
|
Stats: fresh[0].Stats,
|
|
Warnings: fresh[0].Warnings,
|
|
WarningsDocURL: fresh[0].WarningsDocURL,
|
|
}
|
|
|
|
// Merge all fresh results including the first one
|
|
switch merged.Type {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
// Pass nil as cached value to ensure proper merging of all fresh results
|
|
merged.Value = q.mergeTimeSeriesResults(nil, fresh)
|
|
}
|
|
|
|
return merged
|
|
}
|
|
|
|
// Start with cached result
|
|
merged := &qbtypes.Result{
|
|
Type: cached.Type,
|
|
Value: cached.Value,
|
|
Stats: cached.Stats,
|
|
Warnings: cached.Warnings,
|
|
WarningsDocURL: cached.WarningsDocURL,
|
|
}
|
|
|
|
// If no fresh results, return cached
|
|
if len(fresh) == 0 {
|
|
return merged
|
|
}
|
|
|
|
switch merged.Type {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
merged.Value = q.mergeTimeSeriesResults(cached.Value.(*qbtypes.TimeSeriesData), fresh)
|
|
}
|
|
|
|
if len(fresh) > 0 {
|
|
totalWarnings := len(merged.Warnings)
|
|
for _, result := range fresh {
|
|
totalWarnings += len(result.Warnings)
|
|
}
|
|
|
|
allWarnings := make([]string, 0, totalWarnings)
|
|
allWarnings = append(allWarnings, merged.Warnings...)
|
|
for _, result := range fresh {
|
|
allWarnings = append(allWarnings, result.Warnings...)
|
|
}
|
|
merged.Warnings = allWarnings
|
|
}
|
|
|
|
return merged
|
|
}
|
|
|
|
// mergeTimeSeriesResults merges time series data.
|
|
func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, freshResults []*qbtypes.Result) *qbtypes.TimeSeriesData {
|
|
|
|
// Map to store merged series by aggregation index and series key
|
|
seriesMap := make(map[int]map[string]*qbtypes.TimeSeries)
|
|
// Map to store aggregation bucket metadata
|
|
bucketMetadata := make(map[int]*qbtypes.AggregationBucket)
|
|
|
|
// Process cached data if available
|
|
if cachedValue != nil && cachedValue.Aggregations != nil {
|
|
for _, aggBucket := range cachedValue.Aggregations {
|
|
if seriesMap[aggBucket.Index] == nil {
|
|
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
|
|
}
|
|
if bucketMetadata[aggBucket.Index] == nil {
|
|
bucketMetadata[aggBucket.Index] = aggBucket
|
|
}
|
|
for _, series := range aggBucket.Series {
|
|
key := qbtypes.GetUniqueSeriesKey(series.Labels)
|
|
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
|
|
// Merge values from duplicate series in cached data, avoiding duplicate timestamps
|
|
timestampMap := make(map[int64]bool)
|
|
for _, v := range existingSeries.Values {
|
|
timestampMap[v.Timestamp] = true
|
|
}
|
|
|
|
// Only add values with new timestamps
|
|
for _, v := range series.Values {
|
|
if !timestampMap[v.Timestamp] {
|
|
existingSeries.Values = append(existingSeries.Values, v)
|
|
}
|
|
}
|
|
} else {
|
|
// Create a copy to avoid modifying the cached data
|
|
seriesCopy := &qbtypes.TimeSeries{
|
|
Labels: series.Labels,
|
|
Values: make([]*qbtypes.TimeSeriesValue, len(series.Values)),
|
|
}
|
|
copy(seriesCopy.Values, series.Values)
|
|
seriesMap[aggBucket.Index][key] = seriesCopy
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add fresh series
|
|
for _, result := range freshResults {
|
|
freshTS, ok := result.Value.(*qbtypes.TimeSeriesData)
|
|
if !ok || freshTS == nil || freshTS.Aggregations == nil {
|
|
continue
|
|
}
|
|
|
|
for _, aggBucket := range freshTS.Aggregations {
|
|
if seriesMap[aggBucket.Index] == nil {
|
|
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
|
|
}
|
|
// Prefer fresh metadata over cached metadata
|
|
if aggBucket.Alias != "" || aggBucket.Meta.Unit != "" {
|
|
bucketMetadata[aggBucket.Index] = aggBucket
|
|
} else if bucketMetadata[aggBucket.Index] == nil {
|
|
bucketMetadata[aggBucket.Index] = aggBucket
|
|
}
|
|
}
|
|
|
|
for _, aggBucket := range freshTS.Aggregations {
|
|
for _, series := range aggBucket.Series {
|
|
key := qbtypes.GetUniqueSeriesKey(series.Labels)
|
|
|
|
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
|
|
// Merge values, avoiding duplicate timestamps
|
|
// Create a map to track existing timestamps
|
|
timestampMap := make(map[int64]bool)
|
|
for _, v := range existingSeries.Values {
|
|
timestampMap[v.Timestamp] = true
|
|
}
|
|
|
|
// Only add values with new timestamps
|
|
for _, v := range series.Values {
|
|
if !timestampMap[v.Timestamp] {
|
|
existingSeries.Values = append(existingSeries.Values, v)
|
|
}
|
|
}
|
|
} else {
|
|
// New series
|
|
seriesMap[aggBucket.Index][key] = series
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
result := &qbtypes.TimeSeriesData{
|
|
Aggregations: []*qbtypes.AggregationBucket{},
|
|
}
|
|
|
|
// Set QueryName from cached or first fresh result
|
|
if cachedValue != nil {
|
|
result.QueryName = cachedValue.QueryName
|
|
} else if len(freshResults) > 0 {
|
|
if freshTS, ok := freshResults[0].Value.(*qbtypes.TimeSeriesData); ok && freshTS != nil {
|
|
result.QueryName = freshTS.QueryName
|
|
}
|
|
}
|
|
|
|
for index, series := range seriesMap {
|
|
var aggSeries []*qbtypes.TimeSeries
|
|
for _, s := range series {
|
|
// Sort values by timestamp
|
|
slices.SortFunc(s.Values, func(a, b *qbtypes.TimeSeriesValue) int {
|
|
if a.Timestamp < b.Timestamp {
|
|
return -1
|
|
}
|
|
if a.Timestamp > b.Timestamp {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
aggSeries = append(aggSeries, s)
|
|
}
|
|
|
|
// Preserve bucket metadata from either cached or fresh results
|
|
bucket := &qbtypes.AggregationBucket{
|
|
Index: index,
|
|
Series: aggSeries,
|
|
}
|
|
if metadata, ok := bucketMetadata[index]; ok {
|
|
bucket.Alias = metadata.Alias
|
|
bucket.Meta = metadata.Meta
|
|
}
|
|
|
|
result.Aggregations = append(result.Aggregations, bucket)
|
|
}
|
|
|
|
return result
|
|
}
|