mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-04 03:10:21 +01:00
Compare commits
14 Commits
main
...
platform-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8bea74f077 | ||
|
|
9726371b79 | ||
|
|
c71b93f36d | ||
|
|
8d8406d7f0 | ||
|
|
e84c75d41f | ||
|
|
a469fbab14 | ||
|
|
3835a0f4b0 | ||
|
|
9f6cc9c4eb | ||
|
|
d1ae4b6173 | ||
|
|
1e3dc8de4f | ||
|
|
a24b75d071 | ||
|
|
5db81ba7f6 | ||
|
|
77493f7c76 | ||
|
|
b1f7e5a3df |
@@ -40,6 +40,7 @@ type querier struct {
|
||||
promEngine prometheus.Prometheus
|
||||
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
|
||||
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
||||
auditStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
||||
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
||||
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder
|
||||
@@ -56,6 +57,7 @@ func New(
|
||||
promEngine prometheus.Prometheus,
|
||||
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
|
||||
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
||||
auditStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
||||
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
||||
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
|
||||
@@ -69,6 +71,7 @@ func New(
|
||||
promEngine: promEngine,
|
||||
traceStmtBuilder: traceStmtBuilder,
|
||||
logStmtBuilder: logStmtBuilder,
|
||||
auditStmtBuilder: auditStmtBuilder,
|
||||
metricStmtBuilder: metricStmtBuilder,
|
||||
meterStmtBuilder: meterStmtBuilder,
|
||||
traceOperatorStmtBuilder: traceOperatorStmtBuilder,
|
||||
@@ -361,7 +364,11 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
stmtBuilder := q.logStmtBuilder
|
||||
if spec.Source == telemetrytypes.SourceAudit {
|
||||
stmtBuilder = q.auditStmtBuilder
|
||||
}
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, stmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
queries[spec.Name] = bq
|
||||
steps[spec.Name] = spec.StepInterval
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
@@ -550,7 +557,11 @@ func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qb
|
||||
case <-tick:
|
||||
// timestamp end is not specified here
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: tsStart}, req.RequestType)
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, map[string]qbtypes.VariableItem{
|
||||
liveTailStmtBuilder := q.logStmtBuilder
|
||||
if spec.Source == telemetrytypes.SourceAudit {
|
||||
liveTailStmtBuilder = q.auditStmtBuilder
|
||||
}
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, liveTailStmtBuilder, spec, timeRange, req.RequestType, map[string]qbtypes.VariableItem{
|
||||
"id": {
|
||||
Value: updatedLogID,
|
||||
},
|
||||
@@ -850,7 +861,11 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
|
||||
specCopy := qt.spec.Copy()
|
||||
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
||||
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
shiftStmtBuilder := q.logStmtBuilder
|
||||
if qt.spec.Source == telemetrytypes.SourceAudit {
|
||||
shiftStmtBuilder = q.auditStmtBuilder
|
||||
}
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, shiftStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
|
||||
case *builderQuery[qbtypes.MetricAggregation]:
|
||||
specCopy := qt.spec.Copy()
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetryaudit"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
@@ -64,6 +65,11 @@ func newProvider(
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetryaudit.DBName,
|
||||
telemetryaudit.LogsTableName,
|
||||
telemetryaudit.TagAttributesTableName,
|
||||
telemetryaudit.LogAttributeKeysTblName,
|
||||
telemetryaudit.LogResourceKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
telemetrymetadata.ColumnEvolutionMetadataTableName,
|
||||
@@ -133,6 +139,35 @@ func newProvider(
|
||||
telemetrylogs.GetBodyJSONKey,
|
||||
)
|
||||
|
||||
// Create audit statement builder
|
||||
auditFieldMapper := telemetryaudit.NewFieldMapper()
|
||||
auditConditionBuilder := telemetryaudit.NewConditionBuilder(auditFieldMapper)
|
||||
auditResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
|
||||
settings,
|
||||
resourceFilterFieldMapper,
|
||||
resourceFilterConditionBuilder,
|
||||
telemetryMetadataStore,
|
||||
telemetryaudit.DefaultFullTextColumn,
|
||||
nil,
|
||||
)
|
||||
auditAggExprRewriter := querybuilder.NewAggExprRewriter(
|
||||
settings,
|
||||
telemetryaudit.DefaultFullTextColumn,
|
||||
auditFieldMapper,
|
||||
auditConditionBuilder,
|
||||
nil,
|
||||
)
|
||||
auditStmtBuilder := telemetryaudit.NewAuditQueryStatementBuilder(
|
||||
settings,
|
||||
telemetryMetadataStore,
|
||||
auditFieldMapper,
|
||||
auditConditionBuilder,
|
||||
auditResourceFilterStmtBuilder,
|
||||
auditAggExprRewriter,
|
||||
telemetryaudit.DefaultFullTextColumn,
|
||||
nil,
|
||||
)
|
||||
|
||||
// Create metric statement builder
|
||||
metricFieldMapper := telemetrymetrics.NewFieldMapper()
|
||||
metricConditionBuilder := telemetrymetrics.NewConditionBuilder(metricFieldMapper)
|
||||
@@ -169,6 +204,7 @@ func newProvider(
|
||||
prometheus,
|
||||
traceStmtBuilder,
|
||||
logStmtBuilder,
|
||||
auditStmtBuilder,
|
||||
metricStmtBuilder,
|
||||
meterStmtBuilder,
|
||||
traceOperatorStmtBuilder,
|
||||
|
||||
@@ -47,6 +47,7 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
metricStmtBuilder,
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
@@ -104,6 +105,7 @@ func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
logStmtBuilder, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
@@ -154,6 +156,7 @@ func prepareQuerierForTraces(telemetryStore telemetrystore.TelemetryStore, keysM
|
||||
nil, // prometheus
|
||||
traceStmtBuilder, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // auditStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetryaudit"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
@@ -395,6 +396,11 @@ func New(
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetryaudit.DBName,
|
||||
telemetryaudit.LogsTableName,
|
||||
telemetryaudit.TagAttributesTableName,
|
||||
telemetryaudit.LogAttributeKeysTblName,
|
||||
telemetryaudit.LogResourceKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
telemetrymetadata.ColumnEvolutionMetadataTableName,
|
||||
|
||||
204
pkg/telemetryaudit/condition_builder.go
Normal file
204
pkg/telemetryaudit/condition_builder.go
Normal file
@@ -0,0 +1,204 @@
|
||||
package telemetryaudit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
type conditionBuilder struct {
|
||||
fm qbtypes.FieldMapper
|
||||
}
|
||||
|
||||
func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
|
||||
return &conditionBuilder{fm: fm}
|
||||
}
|
||||
|
||||
func (c *conditionBuilder) conditionFor(
|
||||
ctx context.Context,
|
||||
startNs, endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
) (string, error) {
|
||||
columns, err := c.fm.ColumnFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if operator.IsStringSearchOperator() {
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
fieldExpression, err := c.fm.FieldFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
fieldExpression, value = querybuilder.DataTypeCollisionHandledFieldName(key, value, fieldExpression, operator)
|
||||
|
||||
switch operator {
|
||||
case qbtypes.FilterOperatorEqual:
|
||||
return sb.E(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorNotEqual:
|
||||
return sb.NE(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorGreaterThan:
|
||||
return sb.G(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorGreaterThanOrEq:
|
||||
return sb.GE(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorLessThan:
|
||||
return sb.LT(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorLessThanOrEq:
|
||||
return sb.LE(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorLike:
|
||||
return sb.Like(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorNotLike:
|
||||
return sb.NotLike(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorILike:
|
||||
return sb.ILike(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorNotILike:
|
||||
return sb.NotILike(fieldExpression, value), nil
|
||||
case qbtypes.FilterOperatorContains:
|
||||
return sb.ILike(fieldExpression, fmt.Sprintf("%%%s%%", value)), nil
|
||||
case qbtypes.FilterOperatorNotContains:
|
||||
return sb.NotILike(fieldExpression, fmt.Sprintf("%%%s%%", value)), nil
|
||||
case qbtypes.FilterOperatorRegexp:
|
||||
return fmt.Sprintf(`match(%s, %s)`, sqlbuilder.Escape(fieldExpression), sb.Var(value)), nil
|
||||
case qbtypes.FilterOperatorNotRegexp:
|
||||
return fmt.Sprintf(`NOT match(%s, %s)`, sqlbuilder.Escape(fieldExpression), sb.Var(value)), nil
|
||||
case qbtypes.FilterOperatorBetween:
|
||||
values, ok := value.([]any)
|
||||
if !ok {
|
||||
return "", qbtypes.ErrBetweenValues
|
||||
}
|
||||
if len(values) != 2 {
|
||||
return "", qbtypes.ErrBetweenValues
|
||||
}
|
||||
return sb.Between(fieldExpression, values[0], values[1]), nil
|
||||
case qbtypes.FilterOperatorNotBetween:
|
||||
values, ok := value.([]any)
|
||||
if !ok {
|
||||
return "", qbtypes.ErrBetweenValues
|
||||
}
|
||||
if len(values) != 2 {
|
||||
return "", qbtypes.ErrBetweenValues
|
||||
}
|
||||
return sb.NotBetween(fieldExpression, values[0], values[1]), nil
|
||||
case qbtypes.FilterOperatorIn:
|
||||
values, ok := value.([]any)
|
||||
if !ok {
|
||||
return "", qbtypes.ErrInValues
|
||||
}
|
||||
conditions := []string{}
|
||||
for _, value := range values {
|
||||
conditions = append(conditions, sb.E(fieldExpression, value))
|
||||
}
|
||||
return sb.Or(conditions...), nil
|
||||
case qbtypes.FilterOperatorNotIn:
|
||||
values, ok := value.([]any)
|
||||
if !ok {
|
||||
return "", qbtypes.ErrInValues
|
||||
}
|
||||
conditions := []string{}
|
||||
for _, value := range values {
|
||||
conditions = append(conditions, sb.NE(fieldExpression, value))
|
||||
}
|
||||
return sb.And(conditions...), nil
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
var value any
|
||||
column := columns[0]
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(fieldExpression), nil
|
||||
}
|
||||
return sb.IsNull(fieldExpression), nil
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(fieldExpression, value), nil
|
||||
}
|
||||
return sb.E(fieldExpression, value), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(fieldExpression, value), nil
|
||||
}
|
||||
return sb.E(fieldExpression, value), nil
|
||||
case schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
value = 0
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(fieldExpression, value), nil
|
||||
}
|
||||
return sb.E(fieldExpression, value), nil
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
}
|
||||
return sb.NE(leftOperand, true), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
}
|
||||
}
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported operator: %v", operator)
|
||||
}
|
||||
|
||||
func (c *conditionBuilder) ConditionFor(
|
||||
ctx context.Context,
|
||||
startNs uint64,
|
||||
endNs uint64,
|
||||
key *telemetrytypes.TelemetryFieldKey,
|
||||
operator qbtypes.FilterOperator,
|
||||
value any,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
) (string, error) {
|
||||
condition, err := c.conditionFor(ctx, startNs, endNs, key, operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
buildExistCondition := operator.AddDefaultExistsFilter()
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextScope:
|
||||
return condition, nil
|
||||
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute:
|
||||
// build exist condition for resource and attribute fields based on filter operator
|
||||
}
|
||||
|
||||
if buildExistCondition {
|
||||
existsCondition, err := c.conditionFor(ctx, startNs, endNs, key, qbtypes.FilterOperatorExists, nil, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return sb.And(condition, existsCondition), nil
|
||||
}
|
||||
|
||||
return condition, nil
|
||||
}
|
||||
128
pkg/telemetryaudit/const.go
Normal file
128
pkg/telemetryaudit/const.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package telemetryaudit
|
||||
|
||||
import (
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
const (
|
||||
// Internal Columns.
|
||||
IDColumn = "id"
|
||||
TimestampBucketStartColumn = "ts_bucket_start"
|
||||
ResourceFingerPrintColumn = "resource_fingerprint"
|
||||
|
||||
// Intrinsic Columns.
|
||||
TimestampColumn = "timestamp"
|
||||
ObservedTimestampColumn = "observed_timestamp"
|
||||
BodyColumn = "body"
|
||||
EventNameColumn = "event_name"
|
||||
TraceIDColumn = "trace_id"
|
||||
SpanIDColumn = "span_id"
|
||||
TraceFlagsColumn = "trace_flags"
|
||||
SeverityTextColumn = "severity_text"
|
||||
SeverityNumberColumn = "severity_number"
|
||||
ScopeNameColumn = "scope_name"
|
||||
ScopeVersionColumn = "scope_version"
|
||||
|
||||
// Contextual Columns.
|
||||
AttributesStringColumn = "attributes_string"
|
||||
AttributesNumberColumn = "attributes_number"
|
||||
AttributesBoolColumn = "attributes_bool"
|
||||
ScopeStringColumn = "scope_string"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultFullTextColumn = &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
}
|
||||
|
||||
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
|
||||
"body": {
|
||||
Name: "body",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
"trace_id": {
|
||||
Name: "trace_id",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
"span_id": {
|
||||
Name: "span_id",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
"trace_flags": {
|
||||
Name: "trace_flags",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
"severity_text": {
|
||||
Name: "severity_text",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
"severity_number": {
|
||||
Name: "severity_number",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
"event_name": {
|
||||
Name: "event_name",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
}
|
||||
|
||||
DefaultSortingOrder = []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: TimestampColumn,
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: IDColumn,
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionDesc,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
var logsV2Columns = map[string]*schema.Column{
|
||||
"ts_bucket_start": {Name: "ts_bucket_start", Type: schema.ColumnTypeUInt64},
|
||||
"resource_fingerprint": {Name: "resource_fingerprint", Type: schema.ColumnTypeString},
|
||||
"timestamp": {Name: "timestamp", Type: schema.ColumnTypeUInt64},
|
||||
"observed_timestamp": {Name: "observed_timestamp", Type: schema.ColumnTypeUInt64},
|
||||
"id": {Name: "id", Type: schema.ColumnTypeString},
|
||||
"trace_id": {Name: "trace_id", Type: schema.ColumnTypeString},
|
||||
"span_id": {Name: "span_id", Type: schema.ColumnTypeString},
|
||||
"trace_flags": {Name: "trace_flags", Type: schema.ColumnTypeUInt32},
|
||||
"severity_text": {Name: "severity_text", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
|
||||
"severity_number": {Name: "severity_number", Type: schema.ColumnTypeUInt8},
|
||||
"body": {Name: "body", Type: schema.ColumnTypeString},
|
||||
"attributes_string": {Name: "attributes_string", Type: schema.MapColumnType{KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}, ValueType: schema.ColumnTypeString}},
|
||||
"attributes_number": {Name: "attributes_number", Type: schema.MapColumnType{KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}, ValueType: schema.ColumnTypeFloat64}},
|
||||
"attributes_bool": {Name: "attributes_bool", Type: schema.MapColumnType{KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}, ValueType: schema.ColumnTypeBool}},
|
||||
"resource": {Name: "resource", Type: schema.JSONColumnType{}},
|
||||
"event_name": {Name: "event_name", Type: schema.ColumnTypeString},
|
||||
"scope_name": {Name: "scope_name", Type: schema.ColumnTypeString},
|
||||
"scope_version": {Name: "scope_version", Type: schema.ColumnTypeString},
|
||||
"scope_string": {Name: "scope_string", Type: schema.MapColumnType{KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}, ValueType: schema.ColumnTypeString}},
|
||||
}
|
||||
155
pkg/telemetryaudit/field_mapper.go
Normal file
155
pkg/telemetryaudit/field_mapper.go
Normal file
@@ -0,0 +1,155 @@
|
||||
package telemetryaudit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
type fieldMapper struct{}
|
||||
|
||||
func NewFieldMapper() qbtypes.FieldMapper {
|
||||
return &fieldMapper{}
|
||||
}
|
||||
|
||||
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
return []*schema.Column{logsV2Columns["resource"]}, nil
|
||||
case telemetrytypes.FieldContextScope:
|
||||
switch key.Name {
|
||||
case "name", "scope.name", "scope_name":
|
||||
return []*schema.Column{logsV2Columns["scope_name"]}, nil
|
||||
case "version", "scope.version", "scope_version":
|
||||
return []*schema.Column{logsV2Columns["scope_version"]}, nil
|
||||
}
|
||||
return []*schema.Column{logsV2Columns["scope_string"]}, nil
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
switch key.FieldDataType {
|
||||
case telemetrytypes.FieldDataTypeString:
|
||||
return []*schema.Column{logsV2Columns["attributes_string"]}, nil
|
||||
case telemetrytypes.FieldDataTypeInt64, telemetrytypes.FieldDataTypeFloat64, telemetrytypes.FieldDataTypeNumber:
|
||||
return []*schema.Column{logsV2Columns["attributes_number"]}, nil
|
||||
case telemetrytypes.FieldDataTypeBool:
|
||||
return []*schema.Column{logsV2Columns["attributes_bool"]}, nil
|
||||
}
|
||||
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
|
||||
col, ok := logsV2Columns[key.Name]
|
||||
if !ok {
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
return []*schema.Column{col}, nil
|
||||
}
|
||||
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
columns, err := m.getColumn(ctx, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
exprs := []string{}
|
||||
existExpr := []string{}
|
||||
for _, column := range columns {
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if key.FieldContext == telemetrytypes.FieldContextResource {
|
||||
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", column.Name, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", column.Name, key.Name))
|
||||
} else {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns in audit, got %s", key.FieldContext.String)
|
||||
}
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
exprs = append(exprs, column.Name)
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported low cardinality element type %s", elementType)
|
||||
}
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
exprs = append(exprs, column.Name)
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
if key.Materialized {
|
||||
exprs = append(exprs, telemetrytypes.FieldKeyToMaterializedColumnName(key))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s==true", telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)))
|
||||
} else {
|
||||
exprs = append(exprs, fmt.Sprintf("%s['%s']", column.Name, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name))
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported map value type %s", valueType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(exprs) == 1 {
|
||||
return exprs[0], nil
|
||||
} else if len(exprs) > 1 {
|
||||
if len(existExpr) != len(exprs) {
|
||||
return "", errors.New(errors.TypeInternal, errors.CodeInternal, "length of exist exprs doesn't match to that of exprs")
|
||||
}
|
||||
finalExprs := []string{}
|
||||
for i, expr := range exprs {
|
||||
finalExprs = append(finalExprs, fmt.Sprintf("%s, %s", existExpr[i], expr))
|
||||
}
|
||||
return "multiIf(" + strings.Join(finalExprs, ", ") + ", NULL)", nil
|
||||
}
|
||||
|
||||
return columns[0].Name, nil
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
return m.getColumn(ctx, key)
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnExpressionFor(
|
||||
ctx context.Context,
|
||||
tsStart, tsEnd uint64,
|
||||
field *telemetrytypes.TelemetryFieldKey,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
fieldExpression, err := m.FieldFor(ctx, tsStart, tsEnd, field)
|
||||
if errors.Is(err, qbtypes.ErrColumnNotFound) {
|
||||
keysForField := keys[field.Name]
|
||||
if len(keysForField) == 0 {
|
||||
if _, ok := logsV2Columns[field.Name]; ok {
|
||||
field.FieldContext = telemetrytypes.FieldContextLog
|
||||
fieldExpression, _ = m.FieldFor(ctx, tsStart, tsEnd, field)
|
||||
} else {
|
||||
correction, found := telemetrytypes.SuggestCorrection(field.Name, maps.Keys(keys))
|
||||
if found {
|
||||
return "", errors.Wrap(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
|
||||
}
|
||||
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field `%s` not found", field.Name)
|
||||
}
|
||||
} else if len(keysForField) == 1 {
|
||||
fieldExpression, _ = m.FieldFor(ctx, tsStart, tsEnd, keysForField[0])
|
||||
} else {
|
||||
args := []string{}
|
||||
for _, key := range keysForField {
|
||||
fieldExpression, _ = m.FieldFor(ctx, tsStart, tsEnd, key)
|
||||
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", fieldExpression, fieldExpression))
|
||||
}
|
||||
fieldExpression = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s AS `%s`", sqlbuilder.Escape(fieldExpression), field.Name), nil
|
||||
}
|
||||
600
pkg/telemetryaudit/statement_builder.go
Normal file
600
pkg/telemetryaudit/statement_builder.go
Normal file
@@ -0,0 +1,600 @@
|
||||
package telemetryaudit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
type auditQueryStatementBuilder struct {
|
||||
logger *slog.Logger
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
fm qbtypes.FieldMapper
|
||||
cb qbtypes.ConditionBuilder
|
||||
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
||||
aggExprRewriter qbtypes.AggExprRewriter
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
||||
}
|
||||
|
||||
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*auditQueryStatementBuilder)(nil)
|
||||
|
||||
func NewAuditQueryStatementBuilder(
|
||||
settings factory.ProviderSettings,
|
||||
metadataStore telemetrytypes.MetadataStore,
|
||||
fieldMapper qbtypes.FieldMapper,
|
||||
conditionBuilder qbtypes.ConditionBuilder,
|
||||
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
||||
aggExprRewriter qbtypes.AggExprRewriter,
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey,
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
|
||||
) *auditQueryStatementBuilder {
|
||||
auditSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetryaudit")
|
||||
|
||||
return &auditQueryStatementBuilder{
|
||||
logger: auditSettings.Logger(),
|
||||
metadataStore: metadataStore,
|
||||
fm: fieldMapper,
|
||||
cb: conditionBuilder,
|
||||
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
|
||||
aggExprRewriter: aggExprRewriter,
|
||||
fullTextColumn: fullTextColumn,
|
||||
jsonKeyToKey: jsonKeyToKey,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *auditQueryStatementBuilder) Build(
|
||||
ctx context.Context,
|
||||
start uint64,
|
||||
end uint64,
|
||||
requestType qbtypes.RequestType,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
) (*qbtypes.Statement, error) {
|
||||
start = querybuilder.ToNanoSecs(start)
|
||||
end = querybuilder.ToNanoSecs(end)
|
||||
|
||||
keySelectors := getKeySelectors(query)
|
||||
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
query = b.adjustKeys(ctx, keys, query, requestType)
|
||||
|
||||
q := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
var stmt *qbtypes.Statement
|
||||
switch requestType {
|
||||
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
|
||||
stmt, err = b.buildListQuery(ctx, q, query, start, end, keys, variables)
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
stmt, err = b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
|
||||
case qbtypes.RequestTypeScalar:
|
||||
stmt, err = b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
|
||||
var keySelectors []*telemetrytypes.FieldKeySelector
|
||||
|
||||
for idx := range query.Aggregations {
|
||||
aggExpr := query.Aggregations[idx]
|
||||
selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression)
|
||||
keySelectors = append(keySelectors, selectors...)
|
||||
}
|
||||
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
|
||||
keySelectors = append(keySelectors, whereClauseSelectors...)
|
||||
}
|
||||
|
||||
for idx := range query.GroupBy {
|
||||
groupBy := query.GroupBy[idx]
|
||||
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
||||
Name: groupBy.Name,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: groupBy.FieldContext,
|
||||
FieldDataType: groupBy.FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
for idx := range query.SelectFields {
|
||||
selectField := query.SelectFields[idx]
|
||||
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
||||
Name: selectField.Name,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: selectField.FieldContext,
|
||||
FieldDataType: selectField.FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
for idx := range query.Order {
|
||||
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
||||
Name: query.Order[idx].Key.Name,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: query.Order[idx].Key.FieldContext,
|
||||
FieldDataType: query.Order[idx].Key.FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
for idx := range keySelectors {
|
||||
keySelectors[idx].Signal = telemetrytypes.SignalLogs
|
||||
keySelectors[idx].Source = telemetrytypes.SourceAudit
|
||||
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
||||
}
|
||||
|
||||
return keySelectors
|
||||
}
|
||||
|
||||
func (b *auditQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
|
||||
keys["id"] = append([]*telemetrytypes.TelemetryFieldKey{{
|
||||
Name: "id",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
}}, keys["id"]...)
|
||||
|
||||
keys["timestamp"] = append([]*telemetrytypes.TelemetryFieldKey{{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
}}, keys["timestamp"]...)
|
||||
|
||||
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
|
||||
actions = append(actions, querybuilder.AdjustDuplicateKeys(&query)...)
|
||||
|
||||
for idx := range query.SelectFields {
|
||||
actions = append(actions, b.adjustKey(&query.SelectFields[idx], keys)...)
|
||||
}
|
||||
for idx := range query.GroupBy {
|
||||
actions = append(actions, b.adjustKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
|
||||
}
|
||||
for idx := range query.Order {
|
||||
actions = append(actions, b.adjustKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
|
||||
}
|
||||
|
||||
for _, action := range actions {
|
||||
b.logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
|
||||
}
|
||||
|
||||
return query
|
||||
}
|
||||
|
||||
func (b *auditQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
|
||||
if _, ok := IntrinsicFields[key.Name]; ok {
|
||||
intrinsicField := IntrinsicFields[key.Name]
|
||||
return querybuilder.AdjustKey(key, keys, &intrinsicField)
|
||||
}
|
||||
return querybuilder.AdjustKey(key, keys, nil)
|
||||
}
|
||||
|
||||
func (b *auditQueryStatementBuilder) buildListQuery(
|
||||
ctx context.Context,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
||||
start, end uint64,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
) (*qbtypes.Statement, error) {
|
||||
var (
|
||||
cteFragments []string
|
||||
cteArgs [][]any
|
||||
)
|
||||
|
||||
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
||||
return nil, err
|
||||
} else if frag != "" {
|
||||
cteFragments = append(cteFragments, frag)
|
||||
cteArgs = append(cteArgs, args)
|
||||
}
|
||||
|
||||
sb.Select(TimestampColumn)
|
||||
sb.SelectMore(IDColumn)
|
||||
if len(query.SelectFields) == 0 {
|
||||
sb.SelectMore(TraceIDColumn)
|
||||
sb.SelectMore(SpanIDColumn)
|
||||
sb.SelectMore(TraceFlagsColumn)
|
||||
sb.SelectMore(SeverityTextColumn)
|
||||
sb.SelectMore(SeverityNumberColumn)
|
||||
sb.SelectMore(ScopeNameColumn)
|
||||
sb.SelectMore(ScopeVersionColumn)
|
||||
sb.SelectMore(BodyColumn)
|
||||
sb.SelectMore(EventNameColumn)
|
||||
sb.SelectMore(AttributesStringColumn)
|
||||
sb.SelectMore(AttributesNumberColumn)
|
||||
sb.SelectMore(AttributesBoolColumn)
|
||||
sb.SelectMore(ScopeStringColumn)
|
||||
} else {
|
||||
for index := range query.SelectFields {
|
||||
if query.SelectFields[index].Name == TimestampColumn || query.SelectFields[index].Name == IDColumn {
|
||||
continue
|
||||
}
|
||||
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &query.SelectFields[index], keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sb.SelectMore(colExpr)
|
||||
}
|
||||
}
|
||||
|
||||
sb.From(fmt.Sprintf("%s.%s", DBName, LogsTableName))
|
||||
|
||||
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, orderBy := range query.Order {
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &orderBy.Key.TelemetryFieldKey, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
|
||||
}
|
||||
|
||||
if query.Limit > 0 {
|
||||
sb.Limit(query.Limit)
|
||||
} else {
|
||||
sb.Limit(100)
|
||||
}
|
||||
|
||||
if query.Offset > 0 {
|
||||
sb.Offset(query.Offset)
|
||||
}
|
||||
|
||||
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
||||
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
||||
|
||||
stmt := &qbtypes.Statement{
|
||||
Query: finalSQL,
|
||||
Args: finalArgs,
|
||||
}
|
||||
if preparedWhereClause != nil {
|
||||
stmt.Warnings = preparedWhereClause.Warnings
|
||||
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
||||
}
|
||||
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
func (b *auditQueryStatementBuilder) buildTimeSeriesQuery(
|
||||
ctx context.Context,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
||||
start, end uint64,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
) (*qbtypes.Statement, error) {
|
||||
var (
|
||||
cteFragments []string
|
||||
cteArgs [][]any
|
||||
)
|
||||
|
||||
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
||||
return nil, err
|
||||
} else if frag != "" {
|
||||
cteFragments = append(cteFragments, frag)
|
||||
cteArgs = append(cteArgs, args)
|
||||
}
|
||||
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
|
||||
int64(query.StepInterval.Seconds()),
|
||||
))
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
fieldNames := make([]string, 0, len(query.GroupBy))
|
||||
for _, gb := range query.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.Name)
|
||||
allGroupByArgs = append(allGroupByArgs, args...)
|
||||
sb.SelectMore(colExpr)
|
||||
fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.Name))
|
||||
}
|
||||
|
||||
allAggChArgs := make([]any, 0)
|
||||
for i, agg := range query.Aggregations {
|
||||
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, start, end, agg.Expression, uint64(query.StepInterval.Seconds()), keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
allAggChArgs = append(allAggChArgs, chArgs...)
|
||||
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
|
||||
}
|
||||
|
||||
sb.From(fmt.Sprintf("%s.%s", DBName, LogsTableName))
|
||||
|
||||
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var finalSQL string
|
||||
var finalArgs []any
|
||||
|
||||
if query.Limit > 0 && len(query.GroupBy) > 0 {
|
||||
cteSB := sqlbuilder.NewSelectBuilder()
|
||||
cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true, variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query))
|
||||
cteArgs = append(cteArgs, cteStmt.Args)
|
||||
|
||||
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
|
||||
sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
|
||||
|
||||
sb.GroupBy("ts")
|
||||
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr, err := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sb.Having(rewrittenExpr)
|
||||
}
|
||||
|
||||
if len(query.Order) != 0 {
|
||||
for _, orderBy := range query.Order {
|
||||
_, ok := aggOrderBy(orderBy, query)
|
||||
if !ok {
|
||||
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
||||
}
|
||||
}
|
||||
sb.OrderBy("ts desc")
|
||||
}
|
||||
|
||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
||||
|
||||
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
||||
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
||||
} else {
|
||||
sb.GroupBy("ts")
|
||||
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr, err := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sb.Having(rewrittenExpr)
|
||||
}
|
||||
|
||||
if len(query.Order) != 0 {
|
||||
for _, orderBy := range query.Order {
|
||||
_, ok := aggOrderBy(orderBy, query)
|
||||
if !ok {
|
||||
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
||||
}
|
||||
}
|
||||
sb.OrderBy("ts desc")
|
||||
}
|
||||
|
||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
||||
|
||||
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
||||
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
||||
}
|
||||
|
||||
stmt := &qbtypes.Statement{
|
||||
Query: finalSQL,
|
||||
Args: finalArgs,
|
||||
}
|
||||
if preparedWhereClause != nil {
|
||||
stmt.Warnings = preparedWhereClause.Warnings
|
||||
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
||||
}
|
||||
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
func (b *auditQueryStatementBuilder) buildScalarQuery(
|
||||
ctx context.Context,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
||||
start, end uint64,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
skipResourceCTE bool,
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
) (*qbtypes.Statement, error) {
|
||||
var (
|
||||
cteFragments []string
|
||||
cteArgs [][]any
|
||||
)
|
||||
|
||||
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
||||
return nil, err
|
||||
} else if frag != "" && !skipResourceCTE {
|
||||
cteFragments = append(cteFragments, frag)
|
||||
cteArgs = append(cteArgs, args)
|
||||
}
|
||||
|
||||
allAggChArgs := []any{}
|
||||
|
||||
var allGroupByArgs []any
|
||||
|
||||
for _, gb := range query.GroupBy {
|
||||
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.Name)
|
||||
allGroupByArgs = append(allGroupByArgs, args...)
|
||||
sb.SelectMore(colExpr)
|
||||
}
|
||||
|
||||
rateInterval := (end - start) / querybuilder.NsToSeconds
|
||||
|
||||
if len(query.Aggregations) > 0 {
|
||||
for idx := range query.Aggregations {
|
||||
aggExpr := query.Aggregations[idx]
|
||||
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, start, end, aggExpr.Expression, rateInterval, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
allAggChArgs = append(allAggChArgs, chArgs...)
|
||||
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx))
|
||||
}
|
||||
}
|
||||
|
||||
sb.From(fmt.Sprintf("%s.%s", DBName, LogsTableName))
|
||||
|
||||
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr, err := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sb.Having(rewrittenExpr)
|
||||
}
|
||||
|
||||
for _, orderBy := range query.Order {
|
||||
idx, ok := aggOrderBy(orderBy, query)
|
||||
if ok {
|
||||
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue()))
|
||||
} else {
|
||||
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
||||
}
|
||||
}
|
||||
|
||||
if len(query.Order) == 0 {
|
||||
sb.OrderBy("__result_0 DESC")
|
||||
}
|
||||
|
||||
if query.Limit > 0 {
|
||||
sb.Limit(query.Limit)
|
||||
}
|
||||
|
||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||
|
||||
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
||||
|
||||
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
||||
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
||||
|
||||
stmt := &qbtypes.Statement{
|
||||
Query: finalSQL,
|
||||
Args: finalArgs,
|
||||
}
|
||||
if preparedWhereClause != nil {
|
||||
stmt.Warnings = preparedWhereClause.Warnings
|
||||
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
||||
}
|
||||
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
func (b *auditQueryStatementBuilder) addFilterCondition(
|
||||
ctx context.Context,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
start, end uint64,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
) (*querybuilder.PreparedWhereClause, error) {
|
||||
var preparedWhereClause *querybuilder.PreparedWhereClause
|
||||
var err error
|
||||
|
||||
if query.Filter != nil && query.Filter.Expression != "" {
|
||||
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||
Context: ctx,
|
||||
Logger: b.logger,
|
||||
FieldMapper: b.fm,
|
||||
ConditionBuilder: b.cb,
|
||||
FieldKeys: keys,
|
||||
SkipResourceFilter: true,
|
||||
FullTextColumn: b.fullTextColumn,
|
||||
JsonKeyToKey: b.jsonKeyToKey,
|
||||
Variables: variables,
|
||||
StartNs: start,
|
||||
EndNs: end,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if preparedWhereClause != nil {
|
||||
sb.AddWhereClause(preparedWhereClause.WhereClause)
|
||||
}
|
||||
|
||||
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
|
||||
var endBucket uint64
|
||||
if end != 0 {
|
||||
endBucket = end / querybuilder.NsToSeconds
|
||||
}
|
||||
|
||||
if start != 0 {
|
||||
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.GE("ts_bucket_start", startBucket))
|
||||
}
|
||||
if end != 0 {
|
||||
sb.Where(sb.L("timestamp", fmt.Sprintf("%d", end)), sb.LE("ts_bucket_start", endBucket))
|
||||
}
|
||||
|
||||
return preparedWhereClause, nil
|
||||
}
|
||||
|
||||
func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) {
|
||||
for i, agg := range q.Aggregations {
|
||||
if k.Key.Name == agg.Alias || k.Key.Name == agg.Expression || k.Key.Name == fmt.Sprintf("%d", i) {
|
||||
return i, true
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (b *auditQueryStatementBuilder) maybeAttachResourceFilter(
|
||||
ctx context.Context,
|
||||
sb *sqlbuilder.SelectBuilder,
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
||||
start, end uint64,
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
) (cteSQL string, cteArgs []any, err error) {
|
||||
stmt, err := b.resourceFilterStmtBuilder.Build(ctx, start, end, qbtypes.RequestTypeRaw, query, variables)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
|
||||
|
||||
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
|
||||
}
|
||||
241
pkg/telemetryaudit/statement_builder_test.go
Normal file
241
pkg/telemetryaudit/statement_builder_test.go
Normal file
@@ -0,0 +1,241 @@
|
||||
package telemetryaudit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func auditResourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation] {
|
||||
fm := resourcefilter.NewFieldMapper()
|
||||
cb := resourcefilter.NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = auditFieldKeyMap()
|
||||
|
||||
return resourcefilter.NewLogResourceFilterStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
fm,
|
||||
cb,
|
||||
mockMetadataStore,
|
||||
DefaultFullTextColumn,
|
||||
nil,
|
||||
)
|
||||
}
|
||||
|
||||
func auditFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
key := func(name string, ctx telemetrytypes.FieldContext, dt telemetrytypes.FieldDataType, materialized bool) *telemetrytypes.TelemetryFieldKey {
|
||||
return &telemetrytypes.TelemetryFieldKey{
|
||||
Name: name,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: ctx,
|
||||
FieldDataType: dt,
|
||||
Materialized: materialized,
|
||||
}
|
||||
}
|
||||
|
||||
attr := telemetrytypes.FieldContextAttribute
|
||||
res := telemetrytypes.FieldContextResource
|
||||
str := telemetrytypes.FieldDataTypeString
|
||||
i64 := telemetrytypes.FieldDataTypeInt64
|
||||
|
||||
return map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"service.name": {key("service.name", res, str, false)},
|
||||
"signoz.audit.action": {key("signoz.audit.action", attr, str, true)},
|
||||
"signoz.audit.outcome": {key("signoz.audit.outcome", attr, str, true)},
|
||||
"signoz.audit.principal.email": {key("signoz.audit.principal.email", attr, str, true)},
|
||||
"signoz.audit.principal.id": {key("signoz.audit.principal.id", attr, str, true)},
|
||||
"signoz.audit.principal.type": {key("signoz.audit.principal.type", attr, str, true)},
|
||||
"signoz.audit.resource.name": {key("signoz.audit.resource.name", attr, str, true)},
|
||||
"signoz.audit.resource.id": {key("signoz.audit.resource.id", attr, str, true)},
|
||||
"signoz.audit.action_category": {key("signoz.audit.action_category", attr, str, false)},
|
||||
"signoz.audit.error.type": {key("signoz.audit.error.type", attr, str, false)},
|
||||
"signoz.audit.error.code": {key("signoz.audit.error.code", attr, str, false)},
|
||||
"http.request.method": {key("http.request.method", attr, str, false)},
|
||||
"http.response.status_code": {key("http.response.status_code", attr, i64, false)},
|
||||
}
|
||||
}
|
||||
|
||||
func newTestAuditStatementBuilder() *auditQueryStatementBuilder {
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = auditFieldKeyMap()
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
return NewAuditQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
auditResourceFilterStmtBuilder(),
|
||||
aggExprRewriter,
|
||||
DefaultFullTextColumn,
|
||||
nil,
|
||||
)
|
||||
}
|
||||
|
||||
func TestStatementBuilder(t *testing.T) {
|
||||
statementBuilder := newTestAuditStatementBuilder()
|
||||
ctx := context.Background()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
|
||||
expected qbtypes.Statement
|
||||
expectedErr error
|
||||
}{
|
||||
// List: all actions by a specific user (materialized principal.id filter)
|
||||
{
|
||||
name: "ListByPrincipalID",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Source: telemetrytypes.SourceAudit,
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "signoz.audit.principal.id = '019a-1234-abcd-5678'",
|
||||
},
|
||||
Limit: 100,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$principal$$id` = ? AND `attribute_string_signoz$$audit$$principal$$id_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "019a-1234-abcd-5678", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
|
||||
},
|
||||
},
|
||||
// List: all failed actions (materialized outcome filter)
|
||||
{
|
||||
name: "ListByOutcomeFailure",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Source: telemetrytypes.SourceAudit,
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "signoz.audit.outcome = 'failure'",
|
||||
},
|
||||
Limit: 100,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$outcome` = ? AND `attribute_string_signoz$$audit$$outcome_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "failure", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
|
||||
},
|
||||
},
|
||||
// List: change history of a specific dashboard (two materialized column AND)
|
||||
{
|
||||
name: "ListByResourceNameAndID",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Source: telemetrytypes.SourceAudit,
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "signoz.audit.resource.name = 'dashboard' AND signoz.audit.resource.id = '019b-5678-efgh-9012'",
|
||||
},
|
||||
Limit: 100,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((`attribute_string_signoz$$audit$$resource$$name` = ? AND `attribute_string_signoz$$audit$$resource$$name_exists` = ?) AND (`attribute_string_signoz$$audit$$resource$$id` = ? AND `attribute_string_signoz$$audit$$resource$$id_exists` = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "dashboard", true, "019b-5678-efgh-9012", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
|
||||
},
|
||||
},
|
||||
// List: all dashboard deletions (compliance — resource.name + action AND)
|
||||
{
|
||||
name: "ListByResourceNameAndAction",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Source: telemetrytypes.SourceAudit,
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "signoz.audit.resource.name = 'dashboard' AND signoz.audit.action = 'delete'",
|
||||
},
|
||||
Limit: 100,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((`attribute_string_signoz$$audit$$resource$$name` = ? AND `attribute_string_signoz$$audit$$resource$$name_exists` = ?) AND (`attribute_string_signoz$$audit$$action` = ? AND `attribute_string_signoz$$audit$$action_exists` = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "dashboard", true, "delete", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
|
||||
},
|
||||
},
|
||||
// List: all actions by service accounts (materialized principal.type)
|
||||
{
|
||||
name: "ListByPrincipalType",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Source: telemetrytypes.SourceAudit,
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "signoz.audit.principal.type = 'service_account'",
|
||||
},
|
||||
Limit: 100,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$principal$$type` = ? AND `attribute_string_signoz$$audit$$principal$$type_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "service_account", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
|
||||
},
|
||||
},
|
||||
// Scalar: alert — count forbidden errors (outcome + action AND)
|
||||
{
|
||||
name: "ScalarCountByOutcomeAndAction",
|
||||
requestType: qbtypes.RequestTypeScalar,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Source: telemetrytypes.SourceAudit,
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "signoz.audit.outcome = 'failure' AND signoz.audit.action = 'update'",
|
||||
},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{Expression: "count()"},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT count() AS __result_0 FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((`attribute_string_signoz$$audit$$outcome` = ? AND `attribute_string_signoz$$audit$$outcome_exists` = ?) AND (`attribute_string_signoz$$audit$$action` = ? AND `attribute_string_signoz$$audit$$action_exists` = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? ORDER BY __result_0 DESC",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "failure", true, "update", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
},
|
||||
},
|
||||
// TimeSeries: failures grouped by principal email with top-N limit
|
||||
{
|
||||
name: "TimeSeriesFailuresGroupedByPrincipal",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Source: telemetrytypes.SourceAudit,
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{Expression: "count()"},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "signoz.audit.outcome = 'failure'",
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "signoz.audit.principal.email"}},
|
||||
},
|
||||
Limit: 5,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(`attribute_string_signoz$$audit$$principal$$email_exists` = ?, `attribute_string_signoz$$audit$$principal$$email`, NULL)) AS `signoz.audit.principal.email`, count() AS __result_0 FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$outcome` = ? AND `attribute_string_signoz$$audit$$outcome_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `signoz.audit.principal.email` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toString(multiIf(`attribute_string_signoz$$audit$$principal$$email_exists` = ?, `attribute_string_signoz$$audit$$principal$$email`, NULL)) AS `signoz.audit.principal.email`, count() AS __result_0 FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$outcome` = ? AND `attribute_string_signoz$$audit$$outcome_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`signoz.audit.principal.email`) GLOBAL IN (SELECT `signoz.audit.principal.email` FROM __limit_cte) GROUP BY ts, `signoz.audit.principal.email`",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), true, "failure", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 5, true, "failure", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
q, err := statementBuilder.Build(ctx, 1747947419000, 1747983448000, testCase.requestType, testCase.query, nil)
|
||||
if testCase.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), testCase.expectedErr.Error())
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, testCase.expected.Query, q.Query)
|
||||
require.Equal(t, testCase.expected.Args, q.Args)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
11
pkg/telemetryaudit/tables.go
Normal file
11
pkg/telemetryaudit/tables.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package telemetryaudit
|
||||
|
||||
const (
|
||||
DBName = "signoz_audit"
|
||||
LogsTableName = "distributed_logs"
|
||||
LogsLocalTableName = "logs"
|
||||
TagAttributesTableName = "distributed_tag_attributes"
|
||||
TagAttributesLocalTableName = "tag_attributes"
|
||||
LogAttributeKeysTblName = "distributed_logs_attribute_keys"
|
||||
LogResourceKeysTblName = "distributed_logs_resource_keys"
|
||||
)
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetryaudit"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
@@ -27,6 +28,7 @@ import (
|
||||
var (
|
||||
ErrFailedToGetTracesKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get traces keys")
|
||||
ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys")
|
||||
ErrFailedToGetAuditKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get audit keys")
|
||||
ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement")
|
||||
ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys")
|
||||
ErrFailedToGetMeterKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter keys")
|
||||
@@ -50,6 +52,11 @@ type telemetryMetaStore struct {
|
||||
logAttributeKeysTblName string
|
||||
logResourceKeysTblName string
|
||||
logsV2TblName string
|
||||
auditDBName string
|
||||
auditV2TblName string
|
||||
auditFieldsTblName string
|
||||
auditAttributeKeysTblName string
|
||||
auditResourceKeysTblName string
|
||||
relatedMetadataDBName string
|
||||
relatedMetadataTblName string
|
||||
columnEvolutionMetadataTblName string
|
||||
@@ -79,6 +86,11 @@ func NewTelemetryMetaStore(
|
||||
logsFieldsTblName string,
|
||||
logAttributeKeysTblName string,
|
||||
logResourceKeysTblName string,
|
||||
auditDBName string,
|
||||
auditV2TblName string,
|
||||
auditFieldsTblName string,
|
||||
auditAttributeKeysTblName string,
|
||||
auditResourceKeysTblName string,
|
||||
relatedMetadataDBName string,
|
||||
relatedMetadataTblName string,
|
||||
columnEvolutionMetadataTblName string,
|
||||
@@ -101,6 +113,11 @@ func NewTelemetryMetaStore(
|
||||
logsFieldsTblName: logsFieldsTblName,
|
||||
logAttributeKeysTblName: logAttributeKeysTblName,
|
||||
logResourceKeysTblName: logResourceKeysTblName,
|
||||
auditDBName: auditDBName,
|
||||
auditV2TblName: auditV2TblName,
|
||||
auditFieldsTblName: auditFieldsTblName,
|
||||
auditAttributeKeysTblName: auditAttributeKeysTblName,
|
||||
auditResourceKeysTblName: auditResourceKeysTblName,
|
||||
relatedMetadataDBName: relatedMetadataDBName,
|
||||
relatedMetadataTblName: relatedMetadataTblName,
|
||||
columnEvolutionMetadataTblName: columnEvolutionMetadataTblName,
|
||||
@@ -592,6 +609,240 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
|
||||
return keys, complete, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) auditTblStatementToFieldKeys(ctx context.Context) ([]*telemetrytypes.TelemetryFieldKey, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "metadata",
|
||||
instrumentationtypes.CodeFunctionName: "auditTblStatementToFieldKeys",
|
||||
})
|
||||
|
||||
if t.auditDBName == "" || t.auditV2TblName == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", t.auditDBName, t.auditV2TblName)
|
||||
statements := []telemetrytypes.ShowCreateTableStatement{}
|
||||
err := t.telemetrystore.ClickhouseDB().Select(ctx, &statements, query)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTblStatement.Error())
|
||||
}
|
||||
|
||||
materialisedKeys, err := ExtractFieldKeysFromTblStatement(statements[0].Statement)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
|
||||
}
|
||||
|
||||
for idx := range materialisedKeys {
|
||||
materialisedKeys[idx].Signal = telemetrytypes.SignalLogs
|
||||
}
|
||||
|
||||
return materialisedKeys, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) getAuditKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "metadata",
|
||||
instrumentationtypes.CodeFunctionName: "getAuditKeys",
|
||||
})
|
||||
|
||||
if len(fieldKeySelectors) == 0 {
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
if t.auditDBName == "" {
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
matKeys, err := t.auditTblStatementToFieldKeys(ctx)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
mapOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
|
||||
for _, key := range matKeys {
|
||||
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
|
||||
}
|
||||
|
||||
var queries []string
|
||||
var allArgs []any
|
||||
|
||||
queryAttributeTable := false
|
||||
queryResourceTable := false
|
||||
|
||||
for _, selector := range fieldKeySelectors {
|
||||
if selector.FieldContext == telemetrytypes.FieldContextUnspecified {
|
||||
queryAttributeTable = true
|
||||
queryResourceTable = true
|
||||
break
|
||||
} else if selector.FieldContext == telemetrytypes.FieldContextAttribute {
|
||||
queryAttributeTable = true
|
||||
} else if selector.FieldContext == telemetrytypes.FieldContextResource {
|
||||
queryResourceTable = true
|
||||
}
|
||||
}
|
||||
|
||||
tablesToQuery := []struct {
|
||||
fieldContext telemetrytypes.FieldContext
|
||||
shouldQuery bool
|
||||
}{
|
||||
{telemetrytypes.FieldContextAttribute, queryAttributeTable},
|
||||
{telemetrytypes.FieldContextResource, queryResourceTable},
|
||||
}
|
||||
|
||||
for _, table := range tablesToQuery {
|
||||
if !table.shouldQuery {
|
||||
continue
|
||||
}
|
||||
|
||||
fieldContext := table.fieldContext
|
||||
|
||||
var tblName string
|
||||
if fieldContext == telemetrytypes.FieldContextAttribute {
|
||||
tblName = t.auditDBName + "." + t.auditAttributeKeysTblName
|
||||
} else {
|
||||
tblName = t.auditDBName + "." + t.auditResourceKeysTblName
|
||||
}
|
||||
|
||||
sb := sqlbuilder.Select(
|
||||
"name AS tag_key",
|
||||
fmt.Sprintf("'%s' AS tag_type", fieldContext.TagType()),
|
||||
"lower(datatype) AS tag_data_type",
|
||||
fmt.Sprintf("%d AS priority", getPriorityForContext(fieldContext)),
|
||||
).From(tblName)
|
||||
|
||||
var limit int
|
||||
conds := []string{}
|
||||
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified && fieldKeySelector.FieldContext != fieldContext {
|
||||
continue
|
||||
}
|
||||
|
||||
fieldKeyConds := []string{}
|
||||
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("name", fieldKeySelector.Name))
|
||||
} else {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.ILike("name", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
|
||||
}
|
||||
|
||||
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
fieldKeyConds = append(fieldKeyConds, sb.E("datatype", fieldKeySelector.FieldDataType.TagDataType()))
|
||||
}
|
||||
|
||||
if len(fieldKeyConds) > 0 {
|
||||
conds = append(conds, sb.And(fieldKeyConds...))
|
||||
}
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
|
||||
if len(conds) > 0 {
|
||||
sb.Where(sb.Or(conds...))
|
||||
}
|
||||
|
||||
sb.GroupBy("name", "datatype")
|
||||
if limit == 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
queries = append(queries, query)
|
||||
allArgs = append(allArgs, args...)
|
||||
}
|
||||
|
||||
if len(queries) == 0 {
|
||||
return []*telemetrytypes.TelemetryFieldKey{}, true, nil
|
||||
}
|
||||
|
||||
var limit int
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
if limit == 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
mainQuery := fmt.Sprintf(`
|
||||
SELECT tag_key, tag_type, tag_data_type, max(priority) as priority
|
||||
FROM (
|
||||
%s
|
||||
) AS combined_results
|
||||
GROUP BY tag_key, tag_type, tag_data_type
|
||||
ORDER BY priority
|
||||
LIMIT %d
|
||||
`, strings.Join(queries, " UNION ALL "), limit+1)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, mainQuery, allArgs...)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
keys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
rowCount := 0
|
||||
searchTexts := []string{}
|
||||
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
searchTexts = append(searchTexts, fieldKeySelector.Name)
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
rowCount++
|
||||
if rowCount > limit {
|
||||
break
|
||||
}
|
||||
|
||||
var name string
|
||||
var fieldContext telemetrytypes.FieldContext
|
||||
var fieldDataType telemetrytypes.FieldDataType
|
||||
var priority uint8
|
||||
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
|
||||
}
|
||||
key, ok := mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()]
|
||||
|
||||
if !ok {
|
||||
key = &telemetrytypes.TelemetryFieldKey{
|
||||
Name: name,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: fieldContext,
|
||||
FieldDataType: fieldDataType,
|
||||
}
|
||||
}
|
||||
|
||||
keys = append(keys, key)
|
||||
mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
|
||||
}
|
||||
|
||||
if rows.Err() != nil {
|
||||
return nil, false, errors.Wrap(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
|
||||
}
|
||||
|
||||
complete := rowCount <= limit
|
||||
|
||||
// Add intrinsic audit fields (same as logs intrinsics: body, severity_text, etc.)
|
||||
staticKeys := maps.Keys(telemetryaudit.IntrinsicFields)
|
||||
for _, key := range staticKeys {
|
||||
found := false
|
||||
for _, v := range searchTexts {
|
||||
if v == "" || strings.Contains(key, v) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
if field, exists := telemetryaudit.IntrinsicFields[key]; exists {
|
||||
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
|
||||
keys = append(keys, &field)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return keys, complete, nil
|
||||
}
|
||||
|
||||
func getPriorityForContext(ctx telemetrytypes.FieldContext) int {
|
||||
switch ctx {
|
||||
case telemetrytypes.FieldContextLog:
|
||||
@@ -889,7 +1140,11 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
|
||||
case telemetrytypes.SignalTraces:
|
||||
keys, complete, err = t.getTracesKeys(ctx, selectors)
|
||||
case telemetrytypes.SignalLogs:
|
||||
keys, complete, err = t.getLogsKeys(ctx, selectors)
|
||||
if fieldKeySelector.Source == telemetrytypes.SourceAudit {
|
||||
keys, complete, err = t.getAuditKeys(ctx, selectors)
|
||||
} else {
|
||||
keys, complete, err = t.getLogsKeys(ctx, selectors)
|
||||
}
|
||||
case telemetrytypes.SignalMetrics:
|
||||
if fieldKeySelector.Source == telemetrytypes.SourceMeter {
|
||||
keys, complete, err = t.getMeterSourceMetricKeys(ctx, selectors)
|
||||
@@ -938,6 +1193,7 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
|
||||
func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) {
|
||||
|
||||
logsSelectors := []*telemetrytypes.FieldKeySelector{}
|
||||
auditSelectors := []*telemetrytypes.FieldKeySelector{}
|
||||
tracesSelectors := []*telemetrytypes.FieldKeySelector{}
|
||||
metricsSelectors := []*telemetrytypes.FieldKeySelector{}
|
||||
meterSourceMetricsSelectors := []*telemetrytypes.FieldKeySelector{}
|
||||
@@ -945,7 +1201,11 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
switch fieldKeySelector.Signal {
|
||||
case telemetrytypes.SignalLogs:
|
||||
logsSelectors = append(logsSelectors, fieldKeySelector)
|
||||
if fieldKeySelector.Source == telemetrytypes.SourceAudit {
|
||||
auditSelectors = append(auditSelectors, fieldKeySelector)
|
||||
} else {
|
||||
logsSelectors = append(logsSelectors, fieldKeySelector)
|
||||
}
|
||||
case telemetrytypes.SignalTraces:
|
||||
tracesSelectors = append(tracesSelectors, fieldKeySelector)
|
||||
case telemetrytypes.SignalMetrics:
|
||||
@@ -965,6 +1225,10 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
auditKeys, auditComplete, err := t.getAuditKeys(ctx, auditSelectors)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
tracesKeys, tracesComplete, err := t.getTracesKeys(ctx, tracesSelectors)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
@@ -979,12 +1243,15 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
|
||||
return nil, false, err
|
||||
}
|
||||
// Complete only if all queries are complete
|
||||
complete := logsComplete && tracesComplete && metricsComplete
|
||||
complete := logsComplete && auditComplete && tracesComplete && metricsComplete
|
||||
|
||||
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
|
||||
for _, key := range logsKeys {
|
||||
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
|
||||
}
|
||||
for _, key := range auditKeys {
|
||||
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
|
||||
}
|
||||
for _, key := range tracesKeys {
|
||||
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
|
||||
}
|
||||
@@ -1338,6 +1605,100 @@ func (t *telemetryMetaStore) getLogFieldValues(ctx context.Context, fieldValueSe
|
||||
return values, complete, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) getAuditFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "metadata",
|
||||
instrumentationtypes.CodeFunctionName: "getAuditFieldValues",
|
||||
})
|
||||
|
||||
if t.auditDBName == "" || t.auditFieldsTblName == "" {
|
||||
return &telemetrytypes.TelemetryFieldValues{}, true, nil
|
||||
}
|
||||
|
||||
limit := fieldValueSelector.Limit
|
||||
if limit == 0 {
|
||||
limit = 50
|
||||
}
|
||||
|
||||
sb := sqlbuilder.Select("DISTINCT string_value, number_value").From(t.auditDBName + "." + t.auditFieldsTblName)
|
||||
|
||||
if fieldValueSelector.Name != "" {
|
||||
sb.Where(sb.E("tag_key", fieldValueSelector.Name))
|
||||
}
|
||||
|
||||
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
|
||||
sb.Where(sb.E("tag_type", fieldValueSelector.FieldContext.TagType()))
|
||||
}
|
||||
|
||||
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
sb.Where(sb.E("tag_data_type", fieldValueSelector.FieldDataType.TagDataType()))
|
||||
}
|
||||
|
||||
if fieldValueSelector.Value != "" {
|
||||
switch fieldValueSelector.FieldDataType {
|
||||
case telemetrytypes.FieldDataTypeString:
|
||||
sb.Where(sb.ILike("string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
|
||||
case telemetrytypes.FieldDataTypeNumber:
|
||||
sb.Where(sb.IsNotNull("number_value"))
|
||||
sb.Where(sb.ILike("toString(number_value)", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
|
||||
case telemetrytypes.FieldDataTypeUnspecified:
|
||||
sb.Where(sb.Or(
|
||||
sb.ILike("string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"),
|
||||
sb.ILike("toString(number_value)", "%"+escapeForLike(fieldValueSelector.Value)+"%"),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
sb.Limit(limit + 1)
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
values := &telemetrytypes.TelemetryFieldValues{}
|
||||
seen := make(map[string]bool)
|
||||
rowCount := 0
|
||||
totalCount := 0
|
||||
|
||||
for rows.Next() {
|
||||
rowCount++
|
||||
|
||||
var stringValue string
|
||||
var numberValue float64
|
||||
err = rows.Scan(&stringValue, &numberValue)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
|
||||
}
|
||||
if stringValue != "" && !seen[stringValue] {
|
||||
if totalCount >= limit {
|
||||
break
|
||||
}
|
||||
values.StringValues = append(values.StringValues, stringValue)
|
||||
seen[stringValue] = true
|
||||
totalCount++
|
||||
}
|
||||
if numberValue != 0 {
|
||||
if totalCount >= limit {
|
||||
break
|
||||
}
|
||||
if !seen[fmt.Sprintf("%f", numberValue)] {
|
||||
values.NumberValues = append(values.NumberValues, numberValue)
|
||||
seen[fmt.Sprintf("%f", numberValue)] = true
|
||||
totalCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
complete := rowCount <= limit
|
||||
|
||||
return values, complete, nil
|
||||
}
|
||||
|
||||
// getMetricFieldValues returns field values and whether the result is complete.
|
||||
func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
@@ -1628,7 +1989,11 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
|
||||
case telemetrytypes.SignalTraces:
|
||||
values, complete, err = t.getSpanFieldValues(ctx, fieldValueSelector)
|
||||
case telemetrytypes.SignalLogs:
|
||||
values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector)
|
||||
if fieldValueSelector.Source == telemetrytypes.SourceAudit {
|
||||
values, complete, err = t.getAuditFieldValues(ctx, fieldValueSelector)
|
||||
} else {
|
||||
values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector)
|
||||
}
|
||||
case telemetrytypes.SignalMetrics:
|
||||
if fieldValueSelector.Source == telemetrytypes.SourceMeter {
|
||||
values, complete, err = t.getMeterSourceMetricFieldValues(ctx, fieldValueSelector)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetryaudit"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
@@ -37,6 +38,11 @@ func TestGetFirstSeenFromMetricMetadata(t *testing.T) {
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetryaudit.DBName,
|
||||
telemetryaudit.LogsTableName,
|
||||
telemetryaudit.TagAttributesTableName,
|
||||
telemetryaudit.LogAttributeKeysTblName,
|
||||
telemetryaudit.LogResourceKeysTblName,
|
||||
DBName,
|
||||
AttributesMetadataLocalTableName,
|
||||
ColumnEvolutionMetadataTableName,
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetryaudit"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
@@ -36,6 +37,11 @@ func newTestTelemetryMetaStoreTestHelper(store telemetrystore.TelemetryStore) te
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
telemetryaudit.DBName,
|
||||
telemetryaudit.LogsTableName,
|
||||
telemetryaudit.TagAttributesTableName,
|
||||
telemetryaudit.LogAttributeKeysTblName,
|
||||
telemetryaudit.LogResourceKeysTblName,
|
||||
DBName,
|
||||
AttributesMetadataLocalTableName,
|
||||
ColumnEvolutionMetadataTableName,
|
||||
|
||||
@@ -7,11 +7,13 @@ type Source struct {
|
||||
}
|
||||
|
||||
var (
|
||||
SourceAudit = Source{valuer.NewString("audit")}
|
||||
SourceMeter = Source{valuer.NewString("meter")}
|
||||
SourceUnspecified = Source{valuer.NewString("")}
|
||||
)
|
||||
|
||||
// Enum returns the acceptable values for Source.
|
||||
// TODO: Add SourceAudit once the frontend is ready for consumption.
|
||||
func (Source) Enum() []any {
|
||||
return []any{
|
||||
SourceMeter,
|
||||
|
||||
@@ -12,6 +12,7 @@ pytest_plugins = [
|
||||
"fixtures.sqlite",
|
||||
"fixtures.zookeeper",
|
||||
"fixtures.signoz",
|
||||
"fixtures.audit",
|
||||
"fixtures.logs",
|
||||
"fixtures.traces",
|
||||
"fixtures.metrics",
|
||||
|
||||
404
tests/integration/fixtures/audit.py
Normal file
404
tests/integration/fixtures/audit.py
Normal file
@@ -0,0 +1,404 @@
|
||||
import datetime
|
||||
import json
|
||||
from abc import ABC
|
||||
from typing import Any, Callable, Generator, List, Optional
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from ksuid import KsuidMs
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.fingerprint import LogsOrTracesFingerprint
|
||||
|
||||
|
||||
class AuditResource(ABC):
|
||||
labels: str
|
||||
fingerprint: str
|
||||
seen_at_ts_bucket_start: np.int64
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
labels: dict[str, str],
|
||||
fingerprint: str,
|
||||
seen_at_ts_bucket_start: np.int64,
|
||||
) -> None:
|
||||
self.labels = json.dumps(labels, separators=(",", ":"))
|
||||
self.fingerprint = fingerprint
|
||||
self.seen_at_ts_bucket_start = seen_at_ts_bucket_start
|
||||
|
||||
def np_arr(self) -> np.array:
|
||||
return np.array(
|
||||
[
|
||||
self.labels,
|
||||
self.fingerprint,
|
||||
self.seen_at_ts_bucket_start,
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class AuditResourceOrAttributeKeys(ABC):
|
||||
name: str
|
||||
datatype: str
|
||||
|
||||
def __init__(self, name: str, datatype: str) -> None:
|
||||
self.name = name
|
||||
self.datatype = datatype
|
||||
|
||||
def np_arr(self) -> np.array:
|
||||
return np.array([self.name, self.datatype])
|
||||
|
||||
|
||||
class AuditTagAttributes(ABC):
|
||||
unix_milli: np.int64
|
||||
tag_key: str
|
||||
tag_type: str
|
||||
tag_data_type: str
|
||||
string_value: str
|
||||
int64_value: Optional[np.int64]
|
||||
float64_value: Optional[np.float64]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
timestamp: datetime.datetime,
|
||||
tag_key: str,
|
||||
tag_type: str,
|
||||
tag_data_type: str,
|
||||
string_value: Optional[str],
|
||||
int64_value: Optional[np.int64],
|
||||
float64_value: Optional[np.float64],
|
||||
) -> None:
|
||||
self.unix_milli = np.int64(int(timestamp.timestamp() * 1e3))
|
||||
self.tag_key = tag_key
|
||||
self.tag_type = tag_type
|
||||
self.tag_data_type = tag_data_type
|
||||
self.string_value = string_value or ""
|
||||
self.int64_value = int64_value
|
||||
self.float64_value = float64_value
|
||||
|
||||
def np_arr(self) -> np.array:
|
||||
return np.array(
|
||||
[
|
||||
self.unix_milli,
|
||||
self.tag_key,
|
||||
self.tag_type,
|
||||
self.tag_data_type,
|
||||
self.string_value,
|
||||
self.int64_value,
|
||||
self.float64_value,
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class AuditLog(ABC):
|
||||
"""Represents a single audit log event in signoz_audit.
|
||||
|
||||
Matches the ClickHouse DDL from the schema migration (ticket #1936):
|
||||
- Database: signoz_audit
|
||||
- Local table: logs
|
||||
- Distributed table: distributed_logs
|
||||
- No resources_string column (resource JSON only)
|
||||
- Has event_name column
|
||||
- 7 materialized columns auto-populated from attributes_string at INSERT time
|
||||
"""
|
||||
|
||||
ts_bucket_start: np.uint64
|
||||
resource_fingerprint: str
|
||||
timestamp: np.uint64
|
||||
observed_timestamp: np.uint64
|
||||
id: str
|
||||
trace_id: str
|
||||
span_id: str
|
||||
trace_flags: np.uint32
|
||||
severity_text: str
|
||||
severity_number: np.uint8
|
||||
body: str
|
||||
scope_name: str
|
||||
scope_version: str
|
||||
scope_string: dict[str, str]
|
||||
attributes_string: dict[str, str]
|
||||
attributes_number: dict[str, np.float64]
|
||||
attributes_bool: dict[str, bool]
|
||||
resource_json: dict[str, str]
|
||||
event_name: str
|
||||
|
||||
resource: List[AuditResource]
|
||||
tag_attributes: List[AuditTagAttributes]
|
||||
resource_keys: List[AuditResourceOrAttributeKeys]
|
||||
attribute_keys: List[AuditResourceOrAttributeKeys]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
timestamp: Optional[datetime.datetime] = None,
|
||||
resources: dict[str, Any] = {},
|
||||
attributes: dict[str, Any] = {},
|
||||
body: str = "",
|
||||
event_name: str = "",
|
||||
severity_text: str = "INFO",
|
||||
trace_id: str = "",
|
||||
span_id: str = "",
|
||||
trace_flags: np.uint32 = 0,
|
||||
scope_name: str = "signoz.audit",
|
||||
scope_version: str = "",
|
||||
) -> None:
|
||||
if timestamp is None:
|
||||
timestamp = datetime.datetime.now()
|
||||
self.tag_attributes = []
|
||||
self.attribute_keys = []
|
||||
self.resource_keys = []
|
||||
|
||||
self.timestamp = np.uint64(int(timestamp.timestamp() * 1e9))
|
||||
self.observed_timestamp = self.timestamp
|
||||
|
||||
minute = timestamp.minute
|
||||
bucket_minute = 0 if minute < 30 else 30
|
||||
bucket_start = timestamp.replace(minute=bucket_minute, second=0, microsecond=0)
|
||||
self.ts_bucket_start = np.uint64(int(bucket_start.timestamp()))
|
||||
|
||||
self.id = str(KsuidMs(datetime=timestamp))
|
||||
|
||||
self.trace_id = trace_id
|
||||
self.span_id = span_id
|
||||
self.trace_flags = trace_flags
|
||||
|
||||
self.severity_text = severity_text
|
||||
self.severity_number = np.uint8(9 if severity_text == "INFO" else 17)
|
||||
|
||||
self.body = body
|
||||
self.event_name = event_name
|
||||
|
||||
# Resources — JSON column only (no resources_string in audit DDL)
|
||||
self.resource_json = {k: str(v) for k, v in resources.items()}
|
||||
for k, v in self.resource_json.items():
|
||||
self.tag_attributes.append(
|
||||
AuditTagAttributes(
|
||||
timestamp=timestamp,
|
||||
tag_key=k,
|
||||
tag_type="resource",
|
||||
tag_data_type="string",
|
||||
string_value=str(v),
|
||||
int64_value=None,
|
||||
float64_value=None,
|
||||
)
|
||||
)
|
||||
self.resource_keys.append(
|
||||
AuditResourceOrAttributeKeys(name=k, datatype="string")
|
||||
)
|
||||
|
||||
self.resource_fingerprint = LogsOrTracesFingerprint(
|
||||
self.resource_json
|
||||
).calculate()
|
||||
|
||||
# Process attributes by type
|
||||
self.attributes_string = {}
|
||||
self.attributes_number = {}
|
||||
self.attributes_bool = {}
|
||||
|
||||
for k, v in attributes.items():
|
||||
if isinstance(v, bool):
|
||||
self.attributes_bool[k] = v
|
||||
self.tag_attributes.append(
|
||||
AuditTagAttributes(
|
||||
timestamp=timestamp,
|
||||
tag_key=k,
|
||||
tag_type="tag",
|
||||
tag_data_type="bool",
|
||||
string_value=None,
|
||||
int64_value=None,
|
||||
float64_value=None,
|
||||
)
|
||||
)
|
||||
self.attribute_keys.append(
|
||||
AuditResourceOrAttributeKeys(name=k, datatype="bool")
|
||||
)
|
||||
elif isinstance(v, int):
|
||||
self.attributes_number[k] = np.float64(v)
|
||||
self.tag_attributes.append(
|
||||
AuditTagAttributes(
|
||||
timestamp=timestamp,
|
||||
tag_key=k,
|
||||
tag_type="tag",
|
||||
tag_data_type="int64",
|
||||
string_value=None,
|
||||
int64_value=np.int64(v),
|
||||
float64_value=None,
|
||||
)
|
||||
)
|
||||
self.attribute_keys.append(
|
||||
AuditResourceOrAttributeKeys(name=k, datatype="int64")
|
||||
)
|
||||
elif isinstance(v, float):
|
||||
self.attributes_number[k] = np.float64(v)
|
||||
self.tag_attributes.append(
|
||||
AuditTagAttributes(
|
||||
timestamp=timestamp,
|
||||
tag_key=k,
|
||||
tag_type="tag",
|
||||
tag_data_type="float64",
|
||||
string_value=None,
|
||||
int64_value=None,
|
||||
float64_value=np.float64(v),
|
||||
)
|
||||
)
|
||||
self.attribute_keys.append(
|
||||
AuditResourceOrAttributeKeys(name=k, datatype="float64")
|
||||
)
|
||||
else:
|
||||
self.attributes_string[k] = str(v)
|
||||
self.tag_attributes.append(
|
||||
AuditTagAttributes(
|
||||
timestamp=timestamp,
|
||||
tag_key=k,
|
||||
tag_type="tag",
|
||||
tag_data_type="string",
|
||||
string_value=str(v),
|
||||
int64_value=None,
|
||||
float64_value=None,
|
||||
)
|
||||
)
|
||||
self.attribute_keys.append(
|
||||
AuditResourceOrAttributeKeys(name=k, datatype="string")
|
||||
)
|
||||
|
||||
self.scope_name = scope_name
|
||||
self.scope_version = scope_version
|
||||
self.scope_string = {}
|
||||
|
||||
self.resource = [
|
||||
AuditResource(
|
||||
labels=self.resource_json,
|
||||
fingerprint=self.resource_fingerprint,
|
||||
seen_at_ts_bucket_start=self.ts_bucket_start,
|
||||
)
|
||||
]
|
||||
|
||||
def np_arr(self) -> np.array:
|
||||
return np.array(
|
||||
[
|
||||
self.ts_bucket_start,
|
||||
self.resource_fingerprint,
|
||||
self.timestamp,
|
||||
self.observed_timestamp,
|
||||
self.id,
|
||||
self.trace_id,
|
||||
self.span_id,
|
||||
self.trace_flags,
|
||||
self.severity_text,
|
||||
self.severity_number,
|
||||
self.body,
|
||||
self.scope_name,
|
||||
self.scope_version,
|
||||
self.scope_string,
|
||||
self.attributes_string,
|
||||
self.attributes_number,
|
||||
self.attributes_bool,
|
||||
self.resource_json,
|
||||
self.event_name,
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="insert_audit_logs", scope="function")
|
||||
def insert_audit_logs(
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
) -> Generator[Callable[[List[AuditLog]], None], Any, None]:
|
||||
def _insert_audit_logs(logs: List[AuditLog]) -> None:
|
||||
resources: List[AuditResource] = []
|
||||
for log in logs:
|
||||
resources.extend(log.resource)
|
||||
|
||||
if len(resources) > 0:
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_audit",
|
||||
table="distributed_logs_resource",
|
||||
data=[resource.np_arr() for resource in resources],
|
||||
column_names=[
|
||||
"labels",
|
||||
"fingerprint",
|
||||
"seen_at_ts_bucket_start",
|
||||
],
|
||||
)
|
||||
|
||||
tag_attributes: List[AuditTagAttributes] = []
|
||||
for log in logs:
|
||||
tag_attributes.extend(log.tag_attributes)
|
||||
|
||||
if len(tag_attributes) > 0:
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_audit",
|
||||
table="distributed_tag_attributes",
|
||||
data=[ta.np_arr() for ta in tag_attributes],
|
||||
column_names=[
|
||||
"unix_milli",
|
||||
"tag_key",
|
||||
"tag_type",
|
||||
"tag_data_type",
|
||||
"string_value",
|
||||
"int64_value",
|
||||
"float64_value",
|
||||
],
|
||||
)
|
||||
|
||||
attribute_keys: List[AuditResourceOrAttributeKeys] = []
|
||||
for log in logs:
|
||||
attribute_keys.extend(log.attribute_keys)
|
||||
|
||||
if len(attribute_keys) > 0:
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_audit",
|
||||
table="distributed_logs_attribute_keys",
|
||||
data=[ak.np_arr() for ak in attribute_keys],
|
||||
column_names=["name", "datatype"],
|
||||
)
|
||||
|
||||
resource_keys: List[AuditResourceOrAttributeKeys] = []
|
||||
for log in logs:
|
||||
resource_keys.extend(log.resource_keys)
|
||||
|
||||
if len(resource_keys) > 0:
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_audit",
|
||||
table="distributed_logs_resource_keys",
|
||||
data=[rk.np_arr() for rk in resource_keys],
|
||||
column_names=["name", "datatype"],
|
||||
)
|
||||
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_audit",
|
||||
table="distributed_logs",
|
||||
data=[log.np_arr() for log in logs],
|
||||
column_names=[
|
||||
"ts_bucket_start",
|
||||
"resource_fingerprint",
|
||||
"timestamp",
|
||||
"observed_timestamp",
|
||||
"id",
|
||||
"trace_id",
|
||||
"span_id",
|
||||
"trace_flags",
|
||||
"severity_text",
|
||||
"severity_number",
|
||||
"body",
|
||||
"scope_name",
|
||||
"scope_version",
|
||||
"scope_string",
|
||||
"attributes_string",
|
||||
"attributes_number",
|
||||
"attributes_bool",
|
||||
"resource",
|
||||
"event_name",
|
||||
],
|
||||
)
|
||||
|
||||
yield _insert_audit_logs
|
||||
|
||||
cluster = clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER"]
|
||||
for table in [
|
||||
"logs",
|
||||
"logs_resource",
|
||||
"tag_attributes",
|
||||
"logs_attribute_keys",
|
||||
"logs_resource_keys",
|
||||
]:
|
||||
clickhouse.conn.query(
|
||||
f"TRUNCATE TABLE signoz_audit.{table} ON CLUSTER '{cluster}' SYNC"
|
||||
)
|
||||
414
tests/integration/src/auditquerier/01_audit_logs.py
Normal file
414
tests/integration/src/auditquerier/01_audit_logs.py
Normal file
@@ -0,0 +1,414 @@
|
||||
"""Integration tests for audit log querying via /api/v5/query_range.
|
||||
|
||||
Tests verify that audit events inserted directly into signoz_audit ClickHouse
|
||||
tables can be queried back through the standard query_range API with
|
||||
signal=logs, source=audit.
|
||||
|
||||
Each test maps to a real user query pattern from the audit logs design doc.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.audit import AuditLog
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.querier import make_query_request
|
||||
|
||||
|
||||
def _build_audit_query(
|
||||
*,
|
||||
filter_expression: str = "",
|
||||
limit: int = 100,
|
||||
source: str = "audit",
|
||||
) -> dict:
|
||||
spec = {
|
||||
"name": "A",
|
||||
"signal": "logs",
|
||||
"source": source,
|
||||
"disabled": False,
|
||||
"limit": limit,
|
||||
"offset": 0,
|
||||
"order": [
|
||||
{"key": {"name": "timestamp"}, "direction": "desc"},
|
||||
{"key": {"name": "id"}, "direction": "desc"},
|
||||
],
|
||||
"aggregations": [{"expression": "count()"}],
|
||||
}
|
||||
if filter_expression:
|
||||
spec["filter"] = {"expression": filter_expression}
|
||||
return {"type": "builder_query", "spec": spec}
|
||||
|
||||
|
||||
def _build_audit_ts_query(
|
||||
*,
|
||||
aggregation: str = "count()",
|
||||
filter_expression: str = "",
|
||||
group_by: str = "",
|
||||
step_interval: int = 60,
|
||||
limit: int = 0,
|
||||
) -> dict:
|
||||
spec = {
|
||||
"name": "A",
|
||||
"signal": "logs",
|
||||
"source": "audit",
|
||||
"stepInterval": step_interval,
|
||||
"aggregations": [{"expression": aggregation}],
|
||||
}
|
||||
if filter_expression:
|
||||
spec["filter"] = {"expression": filter_expression}
|
||||
if group_by:
|
||||
spec["groupBy"] = [{"name": group_by}]
|
||||
if limit:
|
||||
spec["limit"] = limit
|
||||
return {"type": "builder_query", "spec": spec}
|
||||
|
||||
|
||||
def _query_audit_raw(
|
||||
signoz: types.SigNoz,
|
||||
token: str,
|
||||
query: dict,
|
||||
) -> requests.Response:
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
return make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((now - timedelta(seconds=30)).timestamp() * 1000),
|
||||
end_ms=int(now.timestamp() * 1000),
|
||||
queries=[query],
|
||||
request_type="raw",
|
||||
)
|
||||
|
||||
|
||||
def _query_audit_ts(
|
||||
signoz: types.SigNoz,
|
||||
token: str,
|
||||
query: dict,
|
||||
) -> requests.Response:
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
return make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((now - timedelta(minutes=5)).timestamp() * 1000),
|
||||
end_ms=int(now.timestamp() * 1000),
|
||||
queries=[query],
|
||||
request_type="time_series",
|
||||
)
|
||||
|
||||
|
||||
def _insert_standard_audit_events(
|
||||
insert_audit_logs: Callable[[List[AuditLog]], None],
|
||||
) -> None:
|
||||
"""Insert a representative set of audit events for testing."""
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
|
||||
insert_audit_logs(
|
||||
[
|
||||
# Success: admin creates a dashboard
|
||||
AuditLog(
|
||||
timestamp=now - timedelta(seconds=5),
|
||||
resources={"service.name": "signoz", "service.version": "0.90.0"},
|
||||
attributes={
|
||||
"signoz.audit.principal.id": "user-001",
|
||||
"signoz.audit.principal.email": "alice@acme.com",
|
||||
"signoz.audit.principal.type": "user",
|
||||
"signoz.audit.principal.org_id": "org-001",
|
||||
"signoz.audit.action": "create",
|
||||
"signoz.audit.action_category": "configuration_change",
|
||||
"signoz.audit.outcome": "success",
|
||||
"signoz.audit.resource.name": "dashboard",
|
||||
"signoz.audit.resource.id": "dash-001",
|
||||
},
|
||||
body="alice@acme.com (user-001) created dashboard (dash-001)",
|
||||
event_name="dashboard.created",
|
||||
severity_text="INFO",
|
||||
scope_name="signoz.audit",
|
||||
),
|
||||
# Success: admin updates a dashboard
|
||||
AuditLog(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
resources={"service.name": "signoz", "service.version": "0.90.0"},
|
||||
attributes={
|
||||
"signoz.audit.principal.id": "user-001",
|
||||
"signoz.audit.principal.email": "alice@acme.com",
|
||||
"signoz.audit.principal.type": "user",
|
||||
"signoz.audit.principal.org_id": "org-001",
|
||||
"signoz.audit.action": "update",
|
||||
"signoz.audit.action_category": "configuration_change",
|
||||
"signoz.audit.outcome": "success",
|
||||
"signoz.audit.resource.name": "dashboard",
|
||||
"signoz.audit.resource.id": "dash-001",
|
||||
},
|
||||
body="alice@acme.com (user-001) updated dashboard (dash-001)",
|
||||
event_name="dashboard.updated",
|
||||
severity_text="INFO",
|
||||
scope_name="signoz.audit",
|
||||
),
|
||||
# Failure: viewer tries to delete a dashboard
|
||||
AuditLog(
|
||||
timestamp=now - timedelta(seconds=3),
|
||||
resources={"service.name": "signoz", "service.version": "0.90.0"},
|
||||
attributes={
|
||||
"signoz.audit.principal.id": "user-002",
|
||||
"signoz.audit.principal.email": "viewer@acme.com",
|
||||
"signoz.audit.principal.type": "user",
|
||||
"signoz.audit.principal.org_id": "org-001",
|
||||
"signoz.audit.action": "delete",
|
||||
"signoz.audit.action_category": "configuration_change",
|
||||
"signoz.audit.outcome": "failure",
|
||||
"signoz.audit.resource.name": "dashboard",
|
||||
"signoz.audit.resource.id": "dash-001",
|
||||
"signoz.audit.error.type": "forbidden",
|
||||
"signoz.audit.error.code": "authz_forbidden",
|
||||
},
|
||||
body="viewer@acme.com (user-002) failed to delete dashboard (dash-001): forbidden (authz_forbidden)",
|
||||
event_name="dashboard.deleted",
|
||||
severity_text="ERROR",
|
||||
scope_name="signoz.audit",
|
||||
),
|
||||
# Success: service account creates an API key
|
||||
AuditLog(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
resources={"service.name": "signoz", "service.version": "0.90.0"},
|
||||
attributes={
|
||||
"signoz.audit.principal.id": "sa-001",
|
||||
"signoz.audit.principal.email": "",
|
||||
"signoz.audit.principal.type": "service_account",
|
||||
"signoz.audit.principal.org_id": "org-001",
|
||||
"signoz.audit.action": "create",
|
||||
"signoz.audit.action_category": "access_control",
|
||||
"signoz.audit.outcome": "success",
|
||||
"signoz.audit.resource.name": "serviceaccount",
|
||||
"signoz.audit.resource.id": "sa-001",
|
||||
},
|
||||
body="sa-001 created serviceaccount (sa-001)",
|
||||
event_name="serviceaccount.apikey.created",
|
||||
severity_text="INFO",
|
||||
scope_name="signoz.audit",
|
||||
),
|
||||
# Success: admin logs in
|
||||
AuditLog(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
resources={"service.name": "signoz", "service.version": "0.90.0"},
|
||||
attributes={
|
||||
"signoz.audit.principal.id": "user-001",
|
||||
"signoz.audit.principal.email": "alice@acme.com",
|
||||
"signoz.audit.principal.type": "user",
|
||||
"signoz.audit.principal.org_id": "org-001",
|
||||
"signoz.audit.action": "login",
|
||||
"signoz.audit.action_category": "access_control",
|
||||
"signoz.audit.outcome": "success",
|
||||
"signoz.audit.resource.name": "session",
|
||||
"signoz.audit.resource.id": "*",
|
||||
},
|
||||
body="alice@acme.com (user-001) login session (*)",
|
||||
event_name="session.login",
|
||||
severity_text="INFO",
|
||||
scope_name="signoz.audit",
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_audit_list_all(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_audit_logs: Callable[[List[AuditLog]], None],
|
||||
) -> None:
|
||||
"""List all audit events — verify correct count and ordering."""
|
||||
_insert_standard_audit_events(insert_audit_logs)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = _query_audit_raw(signoz, token, _build_audit_query())
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
|
||||
results = response.json()["data"]["data"]["results"]
|
||||
assert len(results) == 1
|
||||
|
||||
rows = results[0]["rows"]
|
||||
assert len(rows) == 5
|
||||
|
||||
# Most recent first (session.login)
|
||||
assert rows[0]["data"]["event_name"] == "session.login"
|
||||
# Oldest last (dashboard.created)
|
||||
assert rows[4]["data"]["event_name"] == "dashboard.created"
|
||||
|
||||
|
||||
def test_audit_filter_by_principal(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_audit_logs: Callable[[List[AuditLog]], None],
|
||||
) -> None:
|
||||
"""Q1: Show me all actions by a specific user — filter on materialized principal.id."""
|
||||
_insert_standard_audit_events(insert_audit_logs)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = _query_audit_raw(
|
||||
signoz,
|
||||
token,
|
||||
_build_audit_query(filter_expression="signoz.audit.principal.id = 'user-001'"),
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rows = response.json()["data"]["data"]["results"][0]["rows"]
|
||||
assert len(rows) == 3 # alice: create, update, login
|
||||
|
||||
emails = [
|
||||
row["data"]["attributes_string"]["signoz.audit.principal.email"] for row in rows
|
||||
]
|
||||
assert all(e == "alice@acme.com" for e in emails)
|
||||
|
||||
|
||||
def test_audit_filter_by_outcome_failure(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_audit_logs: Callable[[List[AuditLog]], None],
|
||||
) -> None:
|
||||
"""Q2: Show me all failed actions — filter on materialized outcome."""
|
||||
_insert_standard_audit_events(insert_audit_logs)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = _query_audit_raw(
|
||||
signoz,
|
||||
token,
|
||||
_build_audit_query(filter_expression="signoz.audit.outcome = 'failure'"),
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rows = response.json()["data"]["data"]["results"][0]["rows"]
|
||||
assert len(rows) == 1
|
||||
|
||||
row = rows[0]["data"]
|
||||
assert row["attributes_string"]["signoz.audit.principal.email"] == "viewer@acme.com"
|
||||
assert row["attributes_string"]["signoz.audit.action"] == "delete"
|
||||
assert row["severity_text"] == "ERROR"
|
||||
|
||||
|
||||
def test_audit_filter_by_resource(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_audit_logs: Callable[[List[AuditLog]], None],
|
||||
) -> None:
|
||||
"""Q3: Show me the change history of a specific dashboard — filter on resource.name + resource.id."""
|
||||
_insert_standard_audit_events(insert_audit_logs)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = _query_audit_raw(
|
||||
signoz,
|
||||
token,
|
||||
_build_audit_query(
|
||||
filter_expression="signoz.audit.resource.name = 'dashboard' AND signoz.audit.resource.id = 'dash-001'"
|
||||
),
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rows = response.json()["data"]["data"]["results"][0]["rows"]
|
||||
assert len(rows) == 3 # create, update, failed delete — all on dash-001
|
||||
|
||||
actions = [row["data"]["attributes_string"]["signoz.audit.action"] for row in rows]
|
||||
# Most recent first
|
||||
assert actions == ["delete", "update", "create"]
|
||||
|
||||
|
||||
def test_audit_filter_by_principal_type(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_audit_logs: Callable[[List[AuditLog]], None],
|
||||
) -> None:
|
||||
"""Q5: Show me all actions by service accounts — filter on materialized principal.type."""
|
||||
_insert_standard_audit_events(insert_audit_logs)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = _query_audit_raw(
|
||||
signoz,
|
||||
token,
|
||||
_build_audit_query(
|
||||
filter_expression="signoz.audit.principal.type = 'service_account'"
|
||||
),
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rows = response.json()["data"]["data"]["results"][0]["rows"]
|
||||
assert len(rows) == 1
|
||||
assert rows[0]["data"]["event_name"] == "serviceaccount.apikey.created"
|
||||
|
||||
|
||||
def test_audit_scalar_count_failures(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_audit_logs: Callable[[List[AuditLog]], None],
|
||||
) -> None:
|
||||
"""Q6: Alert query — count failed actions (scalar aggregation)."""
|
||||
_insert_standard_audit_events(insert_audit_logs)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((now - timedelta(seconds=30)).timestamp() * 1000),
|
||||
end_ms=int(now.timestamp() * 1000),
|
||||
queries=[
|
||||
_build_audit_ts_query(
|
||||
aggregation="count()",
|
||||
filter_expression="signoz.audit.outcome = 'failure'",
|
||||
)
|
||||
],
|
||||
request_type="scalar",
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
|
||||
results = response.json()["data"]["data"]["results"]
|
||||
assert len(results) == 1
|
||||
|
||||
rows = results[0].get("rows", [])
|
||||
assert len(rows) == 1
|
||||
assert rows[0]["data"]["A"] == 1
|
||||
|
||||
|
||||
def test_audit_does_not_leak_into_logs(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_audit_logs: Callable[[List[AuditLog]], None],
|
||||
) -> None:
|
||||
"""Audit data in signoz_audit must not appear when querying signal=logs without source=audit."""
|
||||
_insert_standard_audit_events(insert_audit_logs)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
# Query regular logs (no source=audit) — should NOT see audit events
|
||||
response = _query_audit_raw(
|
||||
signoz,
|
||||
token,
|
||||
_build_audit_query(source=""),
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rows = response.json()["data"]["data"]["results"][0].get("rows", [])
|
||||
|
||||
# None of the audit events should appear in regular log queries
|
||||
audit_bodies = [
|
||||
row["data"]["body"]
|
||||
for row in rows
|
||||
if "signoz.audit"
|
||||
in row["data"].get("attributes_string", {}).get("signoz.audit.action", "")
|
||||
]
|
||||
assert len(audit_bodies) == 0
|
||||
Reference in New Issue
Block a user