Compare commits

...

14 Commits

Author SHA1 Message Date
grandwizard28
8bea74f077 fix(audit): remove debug assertion from integration test 2026-04-03 20:00:36 +05:30
grandwizard28
9726371b79 fix(audit): revert sb.As in getAuditKeys, fix fixture column_names
Revert getAuditKeys to use raw SQL strings instead of sb.As() which
incorrectly treated string literals as column references. Add explicit
column_names to all ClickHouse insert calls in the audit fixture.
2026-04-03 18:02:01 +05:30
grandwizard28
c71b93f36d test(audit): add integration tests for audit log querying
Cover the documented query patterns: list all events, filter by
principal ID, filter by outcome, filter by resource name+ID,
filter by principal type, scalar count for alerting, and
isolation test ensuring audit data doesn't leak into regular logs.
2026-04-03 16:44:30 +05:30
grandwizard28
8d8406d7f0 feat(audit): wire audit statement builder into querier
Add auditStmtBuilder to querier struct and route LogAggregation
queries with source=audit to it in all three dispatch locations
(main query, live tail, shiftedQuery). Create and wire the full
audit query stack in signozquerier provider.
2026-04-03 16:33:44 +05:30
grandwizard28
e84c75d41f fix(audit): rename tag_attributes_v2 to tag_attributes
Migration uses tag_attributes/distributed_tag_attributes (no _v2
suffix). Rename constants and update all references including the
integration test fixture.
2026-04-03 16:12:31 +05:30
grandwizard28
a469fbab14 feat(audit): add integration test fixture for audit logs
AuditLog fixture inserts into all 5 signoz_audit tables matching
the schema migration DDL: distributed_logs (no resources_string,
has event_name), distributed_logs_resource, distributed_tag_attributes_v2,
distributed_logs_attribute_keys, distributed_logs_resource_keys.
2026-04-03 15:34:20 +05:30
grandwizard28
3835a0f4b0 fix(audit): align table names with schema migration
Migration uses logs/distributed_logs (not logs_v2/distributed_logs_v2).
Rename LogsV2TableName to LogsTableName and LogsV2LocalTableName to
LogsLocalTableName to match the actual signoz_audit DDL.
2026-04-03 15:27:41 +05:30
grandwizard28
9f6cc9c4eb style(audit): move column map to const.go, use sqlbuilder.As in metadata
Move logsV2Columns from field_mapper.go to const.go to colocate all
column definitions. Switch getAuditKeys() to use sb.As() instead of
raw string formatting. Fix FieldContext alignment.
2026-04-03 00:38:58 +05:30
grandwizard28
d1ae4b6173 refactor(audit): inline field key map into test file
Remove test_data.go and inline the audit field key map directly
into statement_builder_test.go with a compact helper function.
2026-04-03 00:20:07 +05:30
grandwizard28
1e3dc8de4f test(audit): add statement builder tests
Cover all three request types (list, time series, scalar) with
audit-specific query patterns: materialized column filters, AND/OR
conditions, limit CTEs, and group-by expressions.
2026-04-03 00:16:31 +05:30
grandwizard28
a24b75d071 feat(audit): add audit field value autocomplete support
Wire distributed_tag_attributes_v2 for signoz_audit into the
metadata store. Add getAuditFieldValues() and route SignalLogs +
SourceAudit to it in GetFieldValues().
2026-04-02 23:12:21 +05:30
grandwizard28
5db81ba7f6 fix(audit): align field mapper with actual audit DDL schema
Remove resources_string (not in audit table DDL).
Add event_name as intrinsic column.
Resource context resolves only through the resource JSON column.
2026-04-02 23:00:26 +05:30
grandwizard28
77493f7c76 chore: address review comments
Comment out SourceAudit from Enum() until frontend is ready.
Use actual audit table constants in metadata test helpers.
2026-04-02 22:52:28 +05:30
grandwizard28
b1f7e5a3df feat(audit): add telemetry audit query infrastructure
Add pkg/telemetryaudit/ with tables, field mapper, condition builder,
and statement builder for querying audit logs from signoz_audit database.
Add SourceAudit to source enum and integrate audit key resolution
into the metadata store.
2026-04-02 16:41:58 +05:30
17 changed files with 2604 additions and 7 deletions

View File

@@ -40,6 +40,7 @@ type querier struct {
promEngine prometheus.Prometheus
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
auditStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder
@@ -56,6 +57,7 @@ func New(
promEngine prometheus.Prometheus,
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
auditStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
@@ -69,6 +71,7 @@ func New(
promEngine: promEngine,
traceStmtBuilder: traceStmtBuilder,
logStmtBuilder: logStmtBuilder,
auditStmtBuilder: auditStmtBuilder,
metricStmtBuilder: metricStmtBuilder,
meterStmtBuilder: meterStmtBuilder,
traceOperatorStmtBuilder: traceOperatorStmtBuilder,
@@ -361,7 +364,11 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
stmtBuilder := q.logStmtBuilder
if spec.Source == telemetrytypes.SourceAudit {
stmtBuilder = q.auditStmtBuilder
}
bq := newBuilderQuery(q.logger, q.telemetryStore, stmtBuilder, spec, timeRange, req.RequestType, tmplVars)
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
@@ -550,7 +557,11 @@ func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qb
case <-tick:
// timestamp end is not specified here
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: tsStart}, req.RequestType)
bq := newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, map[string]qbtypes.VariableItem{
liveTailStmtBuilder := q.logStmtBuilder
if spec.Source == telemetrytypes.SourceAudit {
liveTailStmtBuilder = q.auditStmtBuilder
}
bq := newBuilderQuery(q.logger, q.telemetryStore, liveTailStmtBuilder, spec, timeRange, req.RequestType, map[string]qbtypes.VariableItem{
"id": {
Value: updatedLogID,
},
@@ -850,7 +861,11 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
specCopy := qt.spec.Copy()
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
return newBuilderQuery(q.logger, q.telemetryStore, q.logStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
shiftStmtBuilder := q.logStmtBuilder
if qt.spec.Source == telemetrytypes.SourceAudit {
shiftStmtBuilder = q.auditStmtBuilder
}
return newBuilderQuery(q.logger, q.telemetryStore, shiftStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
case *builderQuery[qbtypes.MetricAggregation]:
specCopy := qt.spec.Copy()

View File

@@ -10,6 +10,7 @@ import (
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/telemetryaudit"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
@@ -64,6 +65,11 @@ func newProvider(
telemetrylogs.TagAttributesV2TableName,
telemetrylogs.LogAttributeKeysTblName,
telemetrylogs.LogResourceKeysTblName,
telemetryaudit.DBName,
telemetryaudit.LogsTableName,
telemetryaudit.TagAttributesTableName,
telemetryaudit.LogAttributeKeysTblName,
telemetryaudit.LogResourceKeysTblName,
telemetrymetadata.DBName,
telemetrymetadata.AttributesMetadataLocalTableName,
telemetrymetadata.ColumnEvolutionMetadataTableName,
@@ -133,6 +139,35 @@ func newProvider(
telemetrylogs.GetBodyJSONKey,
)
// Create audit statement builder
auditFieldMapper := telemetryaudit.NewFieldMapper()
auditConditionBuilder := telemetryaudit.NewConditionBuilder(auditFieldMapper)
auditResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
settings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
telemetryMetadataStore,
telemetryaudit.DefaultFullTextColumn,
nil,
)
auditAggExprRewriter := querybuilder.NewAggExprRewriter(
settings,
telemetryaudit.DefaultFullTextColumn,
auditFieldMapper,
auditConditionBuilder,
nil,
)
auditStmtBuilder := telemetryaudit.NewAuditQueryStatementBuilder(
settings,
telemetryMetadataStore,
auditFieldMapper,
auditConditionBuilder,
auditResourceFilterStmtBuilder,
auditAggExprRewriter,
telemetryaudit.DefaultFullTextColumn,
nil,
)
// Create metric statement builder
metricFieldMapper := telemetrymetrics.NewFieldMapper()
metricConditionBuilder := telemetrymetrics.NewConditionBuilder(metricFieldMapper)
@@ -169,6 +204,7 @@ func newProvider(
prometheus,
traceStmtBuilder,
logStmtBuilder,
auditStmtBuilder,
metricStmtBuilder,
meterStmtBuilder,
traceOperatorStmtBuilder,

View File

@@ -47,6 +47,7 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
nil, // prometheus
nil, // traceStmtBuilder
nil, // logStmtBuilder
nil, // auditStmtBuilder
metricStmtBuilder,
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
@@ -104,6 +105,7 @@ func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap
nil, // prometheus
nil, // traceStmtBuilder
logStmtBuilder, // logStmtBuilder
nil, // auditStmtBuilder
nil, // metricStmtBuilder
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
@@ -154,6 +156,7 @@ func prepareQuerierForTraces(telemetryStore telemetrystore.TelemetryStore, keysM
nil, // prometheus
traceStmtBuilder, // traceStmtBuilder
nil, // logStmtBuilder
nil, // auditStmtBuilder
nil, // metricStmtBuilder
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder

View File

@@ -33,6 +33,7 @@ import (
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/telemetryaudit"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
@@ -395,6 +396,11 @@ func New(
telemetrylogs.TagAttributesV2TableName,
telemetrylogs.LogAttributeKeysTblName,
telemetrylogs.LogResourceKeysTblName,
telemetryaudit.DBName,
telemetryaudit.LogsTableName,
telemetryaudit.TagAttributesTableName,
telemetryaudit.LogAttributeKeysTblName,
telemetryaudit.LogResourceKeysTblName,
telemetrymetadata.DBName,
telemetrymetadata.AttributesMetadataLocalTableName,
telemetrymetadata.ColumnEvolutionMetadataTableName,

View File

@@ -0,0 +1,204 @@
package telemetryaudit
import (
"context"
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
type conditionBuilder struct {
fm qbtypes.FieldMapper
}
func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
return &conditionBuilder{fm: fm}
}
func (c *conditionBuilder) conditionFor(
ctx context.Context,
startNs, endNs uint64,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
columns, err := c.fm.ColumnFor(ctx, startNs, endNs, key)
if err != nil {
return "", err
}
if operator.IsStringSearchOperator() {
value = querybuilder.FormatValueForContains(value)
}
fieldExpression, err := c.fm.FieldFor(ctx, startNs, endNs, key)
if err != nil {
return "", err
}
fieldExpression, value = querybuilder.DataTypeCollisionHandledFieldName(key, value, fieldExpression, operator)
switch operator {
case qbtypes.FilterOperatorEqual:
return sb.E(fieldExpression, value), nil
case qbtypes.FilterOperatorNotEqual:
return sb.NE(fieldExpression, value), nil
case qbtypes.FilterOperatorGreaterThan:
return sb.G(fieldExpression, value), nil
case qbtypes.FilterOperatorGreaterThanOrEq:
return sb.GE(fieldExpression, value), nil
case qbtypes.FilterOperatorLessThan:
return sb.LT(fieldExpression, value), nil
case qbtypes.FilterOperatorLessThanOrEq:
return sb.LE(fieldExpression, value), nil
case qbtypes.FilterOperatorLike:
return sb.Like(fieldExpression, value), nil
case qbtypes.FilterOperatorNotLike:
return sb.NotLike(fieldExpression, value), nil
case qbtypes.FilterOperatorILike:
return sb.ILike(fieldExpression, value), nil
case qbtypes.FilterOperatorNotILike:
return sb.NotILike(fieldExpression, value), nil
case qbtypes.FilterOperatorContains:
return sb.ILike(fieldExpression, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(fieldExpression, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorRegexp:
return fmt.Sprintf(`match(%s, %s)`, sqlbuilder.Escape(fieldExpression), sb.Var(value)), nil
case qbtypes.FilterOperatorNotRegexp:
return fmt.Sprintf(`NOT match(%s, %s)`, sqlbuilder.Escape(fieldExpression), sb.Var(value)), nil
case qbtypes.FilterOperatorBetween:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrBetweenValues
}
if len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.Between(fieldExpression, values[0], values[1]), nil
case qbtypes.FilterOperatorNotBetween:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrBetweenValues
}
if len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.NotBetween(fieldExpression, values[0], values[1]), nil
case qbtypes.FilterOperatorIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, value := range values {
conditions = append(conditions, sb.E(fieldExpression, value))
}
return sb.Or(conditions...), nil
case qbtypes.FilterOperatorNotIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, value := range values {
conditions = append(conditions, sb.NE(fieldExpression, value))
}
return sb.And(conditions...), nil
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
var value any
column := columns[0]
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(fieldExpression), nil
}
return sb.IsNull(fieldExpression), nil
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
value = ""
if operator == qbtypes.FilterOperatorExists {
return sb.NE(fieldExpression, value), nil
}
return sb.E(fieldExpression, value), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
}
case schema.ColumnTypeEnumString:
value = ""
if operator == qbtypes.FilterOperatorExists {
return sb.NE(fieldExpression, value), nil
}
return sb.E(fieldExpression, value), nil
case schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
value = 0
if operator == qbtypes.FilterOperatorExists {
return sb.NE(fieldExpression, value), nil
}
return sb.E(fieldExpression, value), nil
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
}
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
if key.Materialized {
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
}
if operator == qbtypes.FilterOperatorExists {
return sb.E(leftOperand, true), nil
}
return sb.NE(leftOperand, true), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
}
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
}
}
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported operator: %v", operator)
}
func (c *conditionBuilder) ConditionFor(
ctx context.Context,
startNs uint64,
endNs uint64,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
condition, err := c.conditionFor(ctx, startNs, endNs, key, operator, value, sb)
if err != nil {
return "", err
}
buildExistCondition := operator.AddDefaultExistsFilter()
switch key.FieldContext {
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextScope:
return condition, nil
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute:
// build exist condition for resource and attribute fields based on filter operator
}
if buildExistCondition {
existsCondition, err := c.conditionFor(ctx, startNs, endNs, key, qbtypes.FilterOperatorExists, nil, sb)
if err != nil {
return "", err
}
return sb.And(condition, existsCondition), nil
}
return condition, nil
}

128
pkg/telemetryaudit/const.go Normal file
View File

@@ -0,0 +1,128 @@
package telemetryaudit
import (
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const (
// Internal Columns.
IDColumn = "id"
TimestampBucketStartColumn = "ts_bucket_start"
ResourceFingerPrintColumn = "resource_fingerprint"
// Intrinsic Columns.
TimestampColumn = "timestamp"
ObservedTimestampColumn = "observed_timestamp"
BodyColumn = "body"
EventNameColumn = "event_name"
TraceIDColumn = "trace_id"
SpanIDColumn = "span_id"
TraceFlagsColumn = "trace_flags"
SeverityTextColumn = "severity_text"
SeverityNumberColumn = "severity_number"
ScopeNameColumn = "scope_name"
ScopeVersionColumn = "scope_version"
// Contextual Columns.
AttributesStringColumn = "attributes_string"
AttributesNumberColumn = "attributes_number"
AttributesBoolColumn = "attributes_bool"
ScopeStringColumn = "scope_string"
)
var (
DefaultFullTextColumn = &telemetrytypes.TelemetryFieldKey{
Name: "body",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
}
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
"body": {
Name: "body",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"trace_id": {
Name: "trace_id",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"span_id": {
Name: "span_id",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"trace_flags": {
Name: "trace_flags",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"severity_text": {
Name: "severity_text",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"severity_number": {
Name: "severity_number",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"event_name": {
Name: "event_name",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
}
DefaultSortingOrder = []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: TimestampColumn,
},
},
Direction: qbtypes.OrderDirectionDesc,
},
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: IDColumn,
},
},
Direction: qbtypes.OrderDirectionDesc,
},
}
)
var logsV2Columns = map[string]*schema.Column{
"ts_bucket_start": {Name: "ts_bucket_start", Type: schema.ColumnTypeUInt64},
"resource_fingerprint": {Name: "resource_fingerprint", Type: schema.ColumnTypeString},
"timestamp": {Name: "timestamp", Type: schema.ColumnTypeUInt64},
"observed_timestamp": {Name: "observed_timestamp", Type: schema.ColumnTypeUInt64},
"id": {Name: "id", Type: schema.ColumnTypeString},
"trace_id": {Name: "trace_id", Type: schema.ColumnTypeString},
"span_id": {Name: "span_id", Type: schema.ColumnTypeString},
"trace_flags": {Name: "trace_flags", Type: schema.ColumnTypeUInt32},
"severity_text": {Name: "severity_text", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"severity_number": {Name: "severity_number", Type: schema.ColumnTypeUInt8},
"body": {Name: "body", Type: schema.ColumnTypeString},
"attributes_string": {Name: "attributes_string", Type: schema.MapColumnType{KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}, ValueType: schema.ColumnTypeString}},
"attributes_number": {Name: "attributes_number", Type: schema.MapColumnType{KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}, ValueType: schema.ColumnTypeFloat64}},
"attributes_bool": {Name: "attributes_bool", Type: schema.MapColumnType{KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}, ValueType: schema.ColumnTypeBool}},
"resource": {Name: "resource", Type: schema.JSONColumnType{}},
"event_name": {Name: "event_name", Type: schema.ColumnTypeString},
"scope_name": {Name: "scope_name", Type: schema.ColumnTypeString},
"scope_version": {Name: "scope_version", Type: schema.ColumnTypeString},
"scope_string": {Name: "scope_string", Type: schema.MapColumnType{KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}, ValueType: schema.ColumnTypeString}},
}

View File

@@ -0,0 +1,155 @@
package telemetryaudit
import (
"context"
"fmt"
"strings"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
"golang.org/x/exp/maps"
)
type fieldMapper struct{}
func NewFieldMapper() qbtypes.FieldMapper {
return &fieldMapper{}
}
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
return []*schema.Column{logsV2Columns["resource"]}, nil
case telemetrytypes.FieldContextScope:
switch key.Name {
case "name", "scope.name", "scope_name":
return []*schema.Column{logsV2Columns["scope_name"]}, nil
case "version", "scope.version", "scope_version":
return []*schema.Column{logsV2Columns["scope_version"]}, nil
}
return []*schema.Column{logsV2Columns["scope_string"]}, nil
case telemetrytypes.FieldContextAttribute:
switch key.FieldDataType {
case telemetrytypes.FieldDataTypeString:
return []*schema.Column{logsV2Columns["attributes_string"]}, nil
case telemetrytypes.FieldDataTypeInt64, telemetrytypes.FieldDataTypeFloat64, telemetrytypes.FieldDataTypeNumber:
return []*schema.Column{logsV2Columns["attributes_number"]}, nil
case telemetrytypes.FieldDataTypeBool:
return []*schema.Column{logsV2Columns["attributes_bool"]}, nil
}
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
col, ok := logsV2Columns[key.Name]
if !ok {
return nil, qbtypes.ErrColumnNotFound
}
return []*schema.Column{col}, nil
}
return nil, qbtypes.ErrColumnNotFound
}
func (m *fieldMapper) FieldFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
columns, err := m.getColumn(ctx, key)
if err != nil {
return "", err
}
exprs := []string{}
existExpr := []string{}
for _, column := range columns {
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
if key.FieldContext == telemetrytypes.FieldContextResource {
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", column.Name, key.Name))
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", column.Name, key.Name))
} else {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns in audit, got %s", key.FieldContext.String)
}
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
exprs = append(exprs, column.Name)
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported low cardinality element type %s", elementType)
}
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
exprs = append(exprs, column.Name)
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
}
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
if key.Materialized {
exprs = append(exprs, telemetrytypes.FieldKeyToMaterializedColumnName(key))
existExpr = append(existExpr, fmt.Sprintf("%s==true", telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)))
} else {
exprs = append(exprs, fmt.Sprintf("%s['%s']", column.Name, key.Name))
existExpr = append(existExpr, fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name))
}
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported map value type %s", valueType)
}
}
}
if len(exprs) == 1 {
return exprs[0], nil
} else if len(exprs) > 1 {
if len(existExpr) != len(exprs) {
return "", errors.New(errors.TypeInternal, errors.CodeInternal, "length of exist exprs doesn't match to that of exprs")
}
finalExprs := []string{}
for i, expr := range exprs {
finalExprs = append(finalExprs, fmt.Sprintf("%s, %s", existExpr[i], expr))
}
return "multiIf(" + strings.Join(finalExprs, ", ") + ", NULL)", nil
}
return columns[0].Name, nil
}
func (m *fieldMapper) ColumnFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
return m.getColumn(ctx, key)
}
func (m *fieldMapper) ColumnExpressionFor(
ctx context.Context,
tsStart, tsEnd uint64,
field *telemetrytypes.TelemetryFieldKey,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, error) {
fieldExpression, err := m.FieldFor(ctx, tsStart, tsEnd, field)
if errors.Is(err, qbtypes.ErrColumnNotFound) {
keysForField := keys[field.Name]
if len(keysForField) == 0 {
if _, ok := logsV2Columns[field.Name]; ok {
field.FieldContext = telemetrytypes.FieldContextLog
fieldExpression, _ = m.FieldFor(ctx, tsStart, tsEnd, field)
} else {
correction, found := telemetrytypes.SuggestCorrection(field.Name, maps.Keys(keys))
if found {
return "", errors.Wrap(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
}
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field `%s` not found", field.Name)
}
} else if len(keysForField) == 1 {
fieldExpression, _ = m.FieldFor(ctx, tsStart, tsEnd, keysForField[0])
} else {
args := []string{}
for _, key := range keysForField {
fieldExpression, _ = m.FieldFor(ctx, tsStart, tsEnd, key)
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", fieldExpression, fieldExpression))
}
fieldExpression = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
}
}
return fmt.Sprintf("%s AS `%s`", sqlbuilder.Escape(fieldExpression), field.Name), nil
}

View File

@@ -0,0 +1,600 @@
package telemetryaudit
import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
type auditQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
aggExprRewriter qbtypes.AggExprRewriter
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
}
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*auditQueryStatementBuilder)(nil)
func NewAuditQueryStatementBuilder(
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *auditQueryStatementBuilder {
auditSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetryaudit")
return &auditQueryStatementBuilder{
logger: auditSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
aggExprRewriter: aggExprRewriter,
fullTextColumn: fullTextColumn,
jsonKeyToKey: jsonKeyToKey,
}
}
func (b *auditQueryStatementBuilder) Build(
ctx context.Context,
start uint64,
end uint64,
requestType qbtypes.RequestType,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
start = querybuilder.ToNanoSecs(start)
end = querybuilder.ToNanoSecs(end)
keySelectors := getKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
query = b.adjustKeys(ctx, keys, query, requestType)
q := sqlbuilder.NewSelectBuilder()
var stmt *qbtypes.Statement
switch requestType {
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
stmt, err = b.buildListQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeTimeSeries:
stmt, err = b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeScalar:
stmt, err = b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
}
if err != nil {
return nil, err
}
return stmt, nil
}
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression)
keySelectors = append(keySelectors, selectors...)
}
if query.Filter != nil && query.Filter.Expression != "" {
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
keySelectors = append(keySelectors, whereClauseSelectors...)
}
for idx := range query.GroupBy {
groupBy := query.GroupBy[idx]
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: groupBy.Name,
Signal: telemetrytypes.SignalLogs,
FieldContext: groupBy.FieldContext,
FieldDataType: groupBy.FieldDataType,
})
}
for idx := range query.SelectFields {
selectField := query.SelectFields[idx]
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: selectField.Name,
Signal: telemetrytypes.SignalLogs,
FieldContext: selectField.FieldContext,
FieldDataType: selectField.FieldDataType,
})
}
for idx := range query.Order {
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: query.Order[idx].Key.Name,
Signal: telemetrytypes.SignalLogs,
FieldContext: query.Order[idx].Key.FieldContext,
FieldDataType: query.Order[idx].Key.FieldDataType,
})
}
for idx := range keySelectors {
keySelectors[idx].Signal = telemetrytypes.SignalLogs
keySelectors[idx].Source = telemetrytypes.SourceAudit
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
}
return keySelectors
}
func (b *auditQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
keys["id"] = append([]*telemetrytypes.TelemetryFieldKey{{
Name: "id",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
}}, keys["id"]...)
keys["timestamp"] = append([]*telemetrytypes.TelemetryFieldKey{{
Name: "timestamp",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
}}, keys["timestamp"]...)
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
actions = append(actions, querybuilder.AdjustDuplicateKeys(&query)...)
for idx := range query.SelectFields {
actions = append(actions, b.adjustKey(&query.SelectFields[idx], keys)...)
}
for idx := range query.GroupBy {
actions = append(actions, b.adjustKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
}
for idx := range query.Order {
actions = append(actions, b.adjustKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
}
for _, action := range actions {
b.logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
}
return query
}
func (b *auditQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
if _, ok := IntrinsicFields[key.Name]; ok {
intrinsicField := IntrinsicFields[key.Name]
return querybuilder.AdjustKey(key, keys, &intrinsicField)
}
return querybuilder.AdjustKey(key, keys, nil)
}
func (b *auditQueryStatementBuilder) buildListQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
sb.Select(TimestampColumn)
sb.SelectMore(IDColumn)
if len(query.SelectFields) == 0 {
sb.SelectMore(TraceIDColumn)
sb.SelectMore(SpanIDColumn)
sb.SelectMore(TraceFlagsColumn)
sb.SelectMore(SeverityTextColumn)
sb.SelectMore(SeverityNumberColumn)
sb.SelectMore(ScopeNameColumn)
sb.SelectMore(ScopeVersionColumn)
sb.SelectMore(BodyColumn)
sb.SelectMore(EventNameColumn)
sb.SelectMore(AttributesStringColumn)
sb.SelectMore(AttributesNumberColumn)
sb.SelectMore(AttributesBoolColumn)
sb.SelectMore(ScopeStringColumn)
} else {
for index := range query.SelectFields {
if query.SelectFields[index].Name == TimestampColumn || query.SelectFields[index].Name == IDColumn {
continue
}
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &query.SelectFields[index], keys)
if err != nil {
return nil, err
}
sb.SelectMore(colExpr)
}
}
sb.From(fmt.Sprintf("%s.%s", DBName, LogsTableName))
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
if err != nil {
return nil, err
}
for _, orderBy := range query.Order {
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &orderBy.Key.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
}
if query.Limit > 0 {
sb.Limit(query.Limit)
} else {
sb.Limit(100)
}
if query.Offset > 0 {
sb.Offset(query.Offset)
}
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
stmt := &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
}
if preparedWhereClause != nil {
stmt.Warnings = preparedWhereClause.Warnings
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
}
return stmt, nil
}
func (b *auditQueryStatementBuilder) buildTimeSeriesQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
int64(query.StepInterval.Seconds()),
))
var allGroupByArgs []any
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
if err != nil {
return nil, err
}
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.Name)
allGroupByArgs = append(allGroupByArgs, args...)
sb.SelectMore(colExpr)
fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.Name))
}
allAggChArgs := make([]any, 0)
for i, agg := range query.Aggregations {
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, start, end, agg.Expression, uint64(query.StepInterval.Seconds()), keys)
if err != nil {
return nil, err
}
allAggChArgs = append(allAggChArgs, chArgs...)
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
}
sb.From(fmt.Sprintf("%s.%s", DBName, LogsTableName))
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
if err != nil {
return nil, err
}
var finalSQL string
var finalArgs []any
if query.Limit > 0 && len(query.GroupBy) > 0 {
cteSB := sqlbuilder.NewSelectBuilder()
cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true, variables)
if err != nil {
return nil, err
}
cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query))
cteArgs = append(cteArgs, cteStmt.Args)
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr, err := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
if err != nil {
return nil, err
}
sb.Having(rewrittenExpr)
}
if len(query.Order) != 0 {
for _, orderBy := range query.Order {
_, ok := aggOrderBy(orderBy, query)
if !ok {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
sb.OrderBy("ts desc")
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
} else {
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr, err := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
if err != nil {
return nil, err
}
sb.Having(rewrittenExpr)
}
if len(query.Order) != 0 {
for _, orderBy := range query.Order {
_, ok := aggOrderBy(orderBy, query)
if !ok {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
sb.OrderBy("ts desc")
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
}
stmt := &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
}
if preparedWhereClause != nil {
stmt.Warnings = preparedWhereClause.Warnings
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
}
return stmt, nil
}
func (b *auditQueryStatementBuilder) buildScalarQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
skipResourceCTE bool,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
return nil, err
} else if frag != "" && !skipResourceCTE {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
allAggChArgs := []any{}
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
if err != nil {
return nil, err
}
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.Name)
allGroupByArgs = append(allGroupByArgs, args...)
sb.SelectMore(colExpr)
}
rateInterval := (end - start) / querybuilder.NsToSeconds
if len(query.Aggregations) > 0 {
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, start, end, aggExpr.Expression, rateInterval, keys)
if err != nil {
return nil, err
}
allAggChArgs = append(allAggChArgs, chArgs...)
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx))
}
}
sb.From(fmt.Sprintf("%s.%s", DBName, LogsTableName))
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
if err != nil {
return nil, err
}
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr, err := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
if err != nil {
return nil, err
}
sb.Having(rewrittenExpr)
}
for _, orderBy := range query.Order {
idx, ok := aggOrderBy(orderBy, query)
if ok {
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue()))
} else {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
if len(query.Order) == 0 {
sb.OrderBy("__result_0 DESC")
}
if query.Limit > 0 {
sb.Limit(query.Limit)
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
stmt := &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
}
if preparedWhereClause != nil {
stmt.Warnings = preparedWhereClause.Warnings
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
}
return stmt, nil
}
func (b *auditQueryStatementBuilder) addFilterCondition(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*querybuilder.PreparedWhereClause, error) {
var preparedWhereClause *querybuilder.PreparedWhereClause
var err error
if query.Filter != nil && query.Filter.Expression != "" {
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Context: ctx,
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
SkipResourceFilter: true,
FullTextColumn: b.fullTextColumn,
JsonKeyToKey: b.jsonKeyToKey,
Variables: variables,
StartNs: start,
EndNs: end,
})
if err != nil {
return nil, err
}
}
if preparedWhereClause != nil {
sb.AddWhereClause(preparedWhereClause.WhereClause)
}
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
var endBucket uint64
if end != 0 {
endBucket = end / querybuilder.NsToSeconds
}
if start != 0 {
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.GE("ts_bucket_start", startBucket))
}
if end != 0 {
sb.Where(sb.L("timestamp", fmt.Sprintf("%d", end)), sb.LE("ts_bucket_start", endBucket))
}
return preparedWhereClause, nil
}
func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) {
for i, agg := range q.Aggregations {
if k.Key.Name == agg.Alias || k.Key.Name == agg.Expression || k.Key.Name == fmt.Sprintf("%d", i) {
return i, true
}
}
return 0, false
}
func (b *auditQueryStatementBuilder) maybeAttachResourceFilter(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
variables map[string]qbtypes.VariableItem,
) (cteSQL string, cteArgs []any, err error) {
stmt, err := b.resourceFilterStmtBuilder.Build(ctx, start, end, qbtypes.RequestTypeRaw, query, variables)
if err != nil {
return "", nil, err
}
sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
}

View File

@@ -0,0 +1,241 @@
package telemetryaudit
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func auditResourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation] {
fm := resourcefilter.NewFieldMapper()
cb := resourcefilter.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = auditFieldKeyMap()
return resourcefilter.NewLogResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
fm,
cb,
mockMetadataStore,
DefaultFullTextColumn,
nil,
)
}
func auditFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
key := func(name string, ctx telemetrytypes.FieldContext, dt telemetrytypes.FieldDataType, materialized bool) *telemetrytypes.TelemetryFieldKey {
return &telemetrytypes.TelemetryFieldKey{
Name: name,
Signal: telemetrytypes.SignalLogs,
FieldContext: ctx,
FieldDataType: dt,
Materialized: materialized,
}
}
attr := telemetrytypes.FieldContextAttribute
res := telemetrytypes.FieldContextResource
str := telemetrytypes.FieldDataTypeString
i64 := telemetrytypes.FieldDataTypeInt64
return map[string][]*telemetrytypes.TelemetryFieldKey{
"service.name": {key("service.name", res, str, false)},
"signoz.audit.action": {key("signoz.audit.action", attr, str, true)},
"signoz.audit.outcome": {key("signoz.audit.outcome", attr, str, true)},
"signoz.audit.principal.email": {key("signoz.audit.principal.email", attr, str, true)},
"signoz.audit.principal.id": {key("signoz.audit.principal.id", attr, str, true)},
"signoz.audit.principal.type": {key("signoz.audit.principal.type", attr, str, true)},
"signoz.audit.resource.name": {key("signoz.audit.resource.name", attr, str, true)},
"signoz.audit.resource.id": {key("signoz.audit.resource.id", attr, str, true)},
"signoz.audit.action_category": {key("signoz.audit.action_category", attr, str, false)},
"signoz.audit.error.type": {key("signoz.audit.error.type", attr, str, false)},
"signoz.audit.error.code": {key("signoz.audit.error.code", attr, str, false)},
"http.request.method": {key("http.request.method", attr, str, false)},
"http.response.status_code": {key("http.response.status_code", attr, i64, false)},
}
}
func newTestAuditStatementBuilder() *auditQueryStatementBuilder {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = auditFieldKeyMap()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
return NewAuditQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
auditResourceFilterStmtBuilder(),
aggExprRewriter,
DefaultFullTextColumn,
nil,
)
}
func TestStatementBuilder(t *testing.T) {
statementBuilder := newTestAuditStatementBuilder()
ctx := context.Background()
testCases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
expected qbtypes.Statement
expectedErr error
}{
// List: all actions by a specific user (materialized principal.id filter)
{
name: "ListByPrincipalID",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Source: telemetrytypes.SourceAudit,
Filter: &qbtypes.Filter{
Expression: "signoz.audit.principal.id = '019a-1234-abcd-5678'",
},
Limit: 100,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$principal$$id` = ? AND `attribute_string_signoz$$audit$$principal$$id_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "019a-1234-abcd-5678", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
},
},
// List: all failed actions (materialized outcome filter)
{
name: "ListByOutcomeFailure",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Source: telemetrytypes.SourceAudit,
Filter: &qbtypes.Filter{
Expression: "signoz.audit.outcome = 'failure'",
},
Limit: 100,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$outcome` = ? AND `attribute_string_signoz$$audit$$outcome_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "failure", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
},
},
// List: change history of a specific dashboard (two materialized column AND)
{
name: "ListByResourceNameAndID",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Source: telemetrytypes.SourceAudit,
Filter: &qbtypes.Filter{
Expression: "signoz.audit.resource.name = 'dashboard' AND signoz.audit.resource.id = '019b-5678-efgh-9012'",
},
Limit: 100,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((`attribute_string_signoz$$audit$$resource$$name` = ? AND `attribute_string_signoz$$audit$$resource$$name_exists` = ?) AND (`attribute_string_signoz$$audit$$resource$$id` = ? AND `attribute_string_signoz$$audit$$resource$$id_exists` = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "dashboard", true, "019b-5678-efgh-9012", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
},
},
// List: all dashboard deletions (compliance — resource.name + action AND)
{
name: "ListByResourceNameAndAction",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Source: telemetrytypes.SourceAudit,
Filter: &qbtypes.Filter{
Expression: "signoz.audit.resource.name = 'dashboard' AND signoz.audit.action = 'delete'",
},
Limit: 100,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((`attribute_string_signoz$$audit$$resource$$name` = ? AND `attribute_string_signoz$$audit$$resource$$name_exists` = ?) AND (`attribute_string_signoz$$audit$$action` = ? AND `attribute_string_signoz$$audit$$action_exists` = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "dashboard", true, "delete", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
},
},
// List: all actions by service accounts (materialized principal.type)
{
name: "ListByPrincipalType",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Source: telemetrytypes.SourceAudit,
Filter: &qbtypes.Filter{
Expression: "signoz.audit.principal.type = 'service_account'",
},
Limit: 100,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, event_name, attributes_string, attributes_number, attributes_bool, scope_string FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$principal$$type` = ? AND `attribute_string_signoz$$audit$$principal$$type_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "service_account", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 100},
},
},
// Scalar: alert — count forbidden errors (outcome + action AND)
{
name: "ScalarCountByOutcomeAndAction",
requestType: qbtypes.RequestTypeScalar,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Source: telemetrytypes.SourceAudit,
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Filter: &qbtypes.Filter{
Expression: "signoz.audit.outcome = 'failure' AND signoz.audit.action = 'update'",
},
Aggregations: []qbtypes.LogAggregation{
{Expression: "count()"},
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT count() AS __result_0 FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((`attribute_string_signoz$$audit$$outcome` = ? AND `attribute_string_signoz$$audit$$outcome_exists` = ?) AND (`attribute_string_signoz$$audit$$action` = ? AND `attribute_string_signoz$$audit$$action_exists` = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? ORDER BY __result_0 DESC",
Args: []any{uint64(1747945619), uint64(1747983448), "failure", true, "update", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
},
},
// TimeSeries: failures grouped by principal email with top-N limit
{
name: "TimeSeriesFailuresGroupedByPrincipal",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Source: telemetrytypes.SourceAudit,
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Aggregations: []qbtypes.LogAggregation{
{Expression: "count()"},
},
Filter: &qbtypes.Filter{
Expression: "signoz.audit.outcome = 'failure'",
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "signoz.audit.principal.email"}},
},
Limit: 5,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(`attribute_string_signoz$$audit$$principal$$email_exists` = ?, `attribute_string_signoz$$audit$$principal$$email`, NULL)) AS `signoz.audit.principal.email`, count() AS __result_0 FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$outcome` = ? AND `attribute_string_signoz$$audit$$outcome_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `signoz.audit.principal.email` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toString(multiIf(`attribute_string_signoz$$audit$$principal$$email_exists` = ?, `attribute_string_signoz$$audit$$principal$$email`, NULL)) AS `signoz.audit.principal.email`, count() AS __result_0 FROM signoz_audit.distributed_logs WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (`attribute_string_signoz$$audit$$outcome` = ? AND `attribute_string_signoz$$audit$$outcome_exists` = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`signoz.audit.principal.email`) GLOBAL IN (SELECT `signoz.audit.principal.email` FROM __limit_cte) GROUP BY ts, `signoz.audit.principal.email`",
Args: []any{uint64(1747945619), uint64(1747983448), true, "failure", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 5, true, "failure", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
q, err := statementBuilder.Build(ctx, 1747947419000, 1747983448000, testCase.requestType, testCase.query, nil)
if testCase.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), testCase.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, testCase.expected.Query, q.Query)
require.Equal(t, testCase.expected.Args, q.Args)
}
})
}
}

View File

@@ -0,0 +1,11 @@
package telemetryaudit
const (
DBName = "signoz_audit"
LogsTableName = "distributed_logs"
LogsLocalTableName = "logs"
TagAttributesTableName = "distributed_tag_attributes"
TagAttributesLocalTableName = "tag_attributes"
LogAttributeKeysTblName = "distributed_logs_attribute_keys"
LogResourceKeysTblName = "distributed_logs_resource_keys"
)

View File

@@ -13,6 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryaudit"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
@@ -27,6 +28,7 @@ import (
var (
ErrFailedToGetTracesKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get traces keys")
ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys")
ErrFailedToGetAuditKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get audit keys")
ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement")
ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys")
ErrFailedToGetMeterKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter keys")
@@ -50,6 +52,11 @@ type telemetryMetaStore struct {
logAttributeKeysTblName string
logResourceKeysTblName string
logsV2TblName string
auditDBName string
auditV2TblName string
auditFieldsTblName string
auditAttributeKeysTblName string
auditResourceKeysTblName string
relatedMetadataDBName string
relatedMetadataTblName string
columnEvolutionMetadataTblName string
@@ -79,6 +86,11 @@ func NewTelemetryMetaStore(
logsFieldsTblName string,
logAttributeKeysTblName string,
logResourceKeysTblName string,
auditDBName string,
auditV2TblName string,
auditFieldsTblName string,
auditAttributeKeysTblName string,
auditResourceKeysTblName string,
relatedMetadataDBName string,
relatedMetadataTblName string,
columnEvolutionMetadataTblName string,
@@ -101,6 +113,11 @@ func NewTelemetryMetaStore(
logsFieldsTblName: logsFieldsTblName,
logAttributeKeysTblName: logAttributeKeysTblName,
logResourceKeysTblName: logResourceKeysTblName,
auditDBName: auditDBName,
auditV2TblName: auditV2TblName,
auditFieldsTblName: auditFieldsTblName,
auditAttributeKeysTblName: auditAttributeKeysTblName,
auditResourceKeysTblName: auditResourceKeysTblName,
relatedMetadataDBName: relatedMetadataDBName,
relatedMetadataTblName: relatedMetadataTblName,
columnEvolutionMetadataTblName: columnEvolutionMetadataTblName,
@@ -592,6 +609,240 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
return keys, complete, nil
}
func (t *telemetryMetaStore) auditTblStatementToFieldKeys(ctx context.Context) ([]*telemetrytypes.TelemetryFieldKey, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
instrumentationtypes.CodeNamespace: "metadata",
instrumentationtypes.CodeFunctionName: "auditTblStatementToFieldKeys",
})
if t.auditDBName == "" || t.auditV2TblName == "" {
return nil, nil
}
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", t.auditDBName, t.auditV2TblName)
statements := []telemetrytypes.ShowCreateTableStatement{}
err := t.telemetrystore.ClickhouseDB().Select(ctx, &statements, query)
if err != nil {
return nil, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTblStatement.Error())
}
materialisedKeys, err := ExtractFieldKeysFromTblStatement(statements[0].Statement)
if err != nil {
return nil, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
}
for idx := range materialisedKeys {
materialisedKeys[idx].Signal = telemetrytypes.SignalLogs
}
return materialisedKeys, nil
}
func (t *telemetryMetaStore) getAuditKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
instrumentationtypes.CodeNamespace: "metadata",
instrumentationtypes.CodeFunctionName: "getAuditKeys",
})
if len(fieldKeySelectors) == 0 {
return nil, true, nil
}
if t.auditDBName == "" {
return nil, true, nil
}
matKeys, err := t.auditTblStatementToFieldKeys(ctx)
if err != nil {
return nil, false, err
}
mapOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
for _, key := range matKeys {
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
}
var queries []string
var allArgs []any
queryAttributeTable := false
queryResourceTable := false
for _, selector := range fieldKeySelectors {
if selector.FieldContext == telemetrytypes.FieldContextUnspecified {
queryAttributeTable = true
queryResourceTable = true
break
} else if selector.FieldContext == telemetrytypes.FieldContextAttribute {
queryAttributeTable = true
} else if selector.FieldContext == telemetrytypes.FieldContextResource {
queryResourceTable = true
}
}
tablesToQuery := []struct {
fieldContext telemetrytypes.FieldContext
shouldQuery bool
}{
{telemetrytypes.FieldContextAttribute, queryAttributeTable},
{telemetrytypes.FieldContextResource, queryResourceTable},
}
for _, table := range tablesToQuery {
if !table.shouldQuery {
continue
}
fieldContext := table.fieldContext
var tblName string
if fieldContext == telemetrytypes.FieldContextAttribute {
tblName = t.auditDBName + "." + t.auditAttributeKeysTblName
} else {
tblName = t.auditDBName + "." + t.auditResourceKeysTblName
}
sb := sqlbuilder.Select(
"name AS tag_key",
fmt.Sprintf("'%s' AS tag_type", fieldContext.TagType()),
"lower(datatype) AS tag_data_type",
fmt.Sprintf("%d AS priority", getPriorityForContext(fieldContext)),
).From(tblName)
var limit int
conds := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified && fieldKeySelector.FieldContext != fieldContext {
continue
}
fieldKeyConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldKeyConds = append(fieldKeyConds, sb.E("name", fieldKeySelector.Name))
} else {
fieldKeyConds = append(fieldKeyConds, sb.ILike("name", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
}
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("datatype", fieldKeySelector.FieldDataType.TagDataType()))
}
if len(fieldKeyConds) > 0 {
conds = append(conds, sb.And(fieldKeyConds...))
}
limit += fieldKeySelector.Limit
}
if len(conds) > 0 {
sb.Where(sb.Or(conds...))
}
sb.GroupBy("name", "datatype")
if limit == 0 {
limit = 1000
}
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
queries = append(queries, query)
allArgs = append(allArgs, args...)
}
if len(queries) == 0 {
return []*telemetrytypes.TelemetryFieldKey{}, true, nil
}
var limit int
for _, fieldKeySelector := range fieldKeySelectors {
limit += fieldKeySelector.Limit
}
if limit == 0 {
limit = 1000
}
mainQuery := fmt.Sprintf(`
SELECT tag_key, tag_type, tag_data_type, max(priority) as priority
FROM (
%s
) AS combined_results
GROUP BY tag_key, tag_type, tag_data_type
ORDER BY priority
LIMIT %d
`, strings.Join(queries, " UNION ALL "), limit+1)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, mainQuery, allArgs...)
if err != nil {
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
rowCount := 0
searchTexts := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
searchTexts = append(searchTexts, fieldKeySelector.Name)
}
for rows.Next() {
rowCount++
if rowCount > limit {
break
}
var name string
var fieldContext telemetrytypes.FieldContext
var fieldDataType telemetrytypes.FieldDataType
var priority uint8
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
if err != nil {
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
}
key, ok := mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()]
if !ok {
key = &telemetrytypes.TelemetryFieldKey{
Name: name,
Signal: telemetrytypes.SignalLogs,
FieldContext: fieldContext,
FieldDataType: fieldDataType,
}
}
keys = append(keys, key)
mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
}
if rows.Err() != nil {
return nil, false, errors.Wrap(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
}
complete := rowCount <= limit
// Add intrinsic audit fields (same as logs intrinsics: body, severity_text, etc.)
staticKeys := maps.Keys(telemetryaudit.IntrinsicFields)
for _, key := range staticKeys {
found := false
for _, v := range searchTexts {
if v == "" || strings.Contains(key, v) {
found = true
break
}
}
if found {
if field, exists := telemetryaudit.IntrinsicFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
keys = append(keys, &field)
}
}
}
}
return keys, complete, nil
}
func getPriorityForContext(ctx telemetrytypes.FieldContext) int {
switch ctx {
case telemetrytypes.FieldContextLog:
@@ -889,7 +1140,11 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
case telemetrytypes.SignalTraces:
keys, complete, err = t.getTracesKeys(ctx, selectors)
case telemetrytypes.SignalLogs:
keys, complete, err = t.getLogsKeys(ctx, selectors)
if fieldKeySelector.Source == telemetrytypes.SourceAudit {
keys, complete, err = t.getAuditKeys(ctx, selectors)
} else {
keys, complete, err = t.getLogsKeys(ctx, selectors)
}
case telemetrytypes.SignalMetrics:
if fieldKeySelector.Source == telemetrytypes.SourceMeter {
keys, complete, err = t.getMeterSourceMetricKeys(ctx, selectors)
@@ -938,6 +1193,7 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) {
logsSelectors := []*telemetrytypes.FieldKeySelector{}
auditSelectors := []*telemetrytypes.FieldKeySelector{}
tracesSelectors := []*telemetrytypes.FieldKeySelector{}
metricsSelectors := []*telemetrytypes.FieldKeySelector{}
meterSourceMetricsSelectors := []*telemetrytypes.FieldKeySelector{}
@@ -945,7 +1201,11 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
for _, fieldKeySelector := range fieldKeySelectors {
switch fieldKeySelector.Signal {
case telemetrytypes.SignalLogs:
logsSelectors = append(logsSelectors, fieldKeySelector)
if fieldKeySelector.Source == telemetrytypes.SourceAudit {
auditSelectors = append(auditSelectors, fieldKeySelector)
} else {
logsSelectors = append(logsSelectors, fieldKeySelector)
}
case telemetrytypes.SignalTraces:
tracesSelectors = append(tracesSelectors, fieldKeySelector)
case telemetrytypes.SignalMetrics:
@@ -965,6 +1225,10 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
if err != nil {
return nil, false, err
}
auditKeys, auditComplete, err := t.getAuditKeys(ctx, auditSelectors)
if err != nil {
return nil, false, err
}
tracesKeys, tracesComplete, err := t.getTracesKeys(ctx, tracesSelectors)
if err != nil {
return nil, false, err
@@ -979,12 +1243,15 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
return nil, false, err
}
// Complete only if all queries are complete
complete := logsComplete && tracesComplete && metricsComplete
complete := logsComplete && auditComplete && tracesComplete && metricsComplete
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
for _, key := range logsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range auditKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range tracesKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
@@ -1338,6 +1605,100 @@ func (t *telemetryMetaStore) getLogFieldValues(ctx context.Context, fieldValueSe
return values, complete, nil
}
func (t *telemetryMetaStore) getAuditFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
instrumentationtypes.CodeNamespace: "metadata",
instrumentationtypes.CodeFunctionName: "getAuditFieldValues",
})
if t.auditDBName == "" || t.auditFieldsTblName == "" {
return &telemetrytypes.TelemetryFieldValues{}, true, nil
}
limit := fieldValueSelector.Limit
if limit == 0 {
limit = 50
}
sb := sqlbuilder.Select("DISTINCT string_value, number_value").From(t.auditDBName + "." + t.auditFieldsTblName)
if fieldValueSelector.Name != "" {
sb.Where(sb.E("tag_key", fieldValueSelector.Name))
}
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
sb.Where(sb.E("tag_type", fieldValueSelector.FieldContext.TagType()))
}
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
sb.Where(sb.E("tag_data_type", fieldValueSelector.FieldDataType.TagDataType()))
}
if fieldValueSelector.Value != "" {
switch fieldValueSelector.FieldDataType {
case telemetrytypes.FieldDataTypeString:
sb.Where(sb.ILike("string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
case telemetrytypes.FieldDataTypeNumber:
sb.Where(sb.IsNotNull("number_value"))
sb.Where(sb.ILike("toString(number_value)", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
case telemetrytypes.FieldDataTypeUnspecified:
sb.Where(sb.Or(
sb.ILike("string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"),
sb.ILike("toString(number_value)", "%"+escapeForLike(fieldValueSelector.Value)+"%"),
))
}
}
sb.Limit(limit + 1)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
}
defer rows.Close()
values := &telemetrytypes.TelemetryFieldValues{}
seen := make(map[string]bool)
rowCount := 0
totalCount := 0
for rows.Next() {
rowCount++
var stringValue string
var numberValue float64
err = rows.Scan(&stringValue, &numberValue)
if err != nil {
return nil, false, errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetAuditKeys.Error())
}
if stringValue != "" && !seen[stringValue] {
if totalCount >= limit {
break
}
values.StringValues = append(values.StringValues, stringValue)
seen[stringValue] = true
totalCount++
}
if numberValue != 0 {
if totalCount >= limit {
break
}
if !seen[fmt.Sprintf("%f", numberValue)] {
values.NumberValues = append(values.NumberValues, numberValue)
seen[fmt.Sprintf("%f", numberValue)] = true
totalCount++
}
}
}
complete := rowCount <= limit
return values, complete, nil
}
// getMetricFieldValues returns field values and whether the result is complete.
func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
@@ -1628,7 +1989,11 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
case telemetrytypes.SignalTraces:
values, complete, err = t.getSpanFieldValues(ctx, fieldValueSelector)
case telemetrytypes.SignalLogs:
values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector)
if fieldValueSelector.Source == telemetrytypes.SourceAudit {
values, complete, err = t.getAuditFieldValues(ctx, fieldValueSelector)
} else {
values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector)
}
case telemetrytypes.SignalMetrics:
if fieldValueSelector.Source == telemetrytypes.SourceMeter {
values, complete, err = t.getMeterSourceMetricFieldValues(ctx, fieldValueSelector)

View File

@@ -5,6 +5,7 @@ import (
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetryaudit"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
@@ -37,6 +38,11 @@ func TestGetFirstSeenFromMetricMetadata(t *testing.T) {
telemetrylogs.TagAttributesV2TableName,
telemetrylogs.LogAttributeKeysTblName,
telemetrylogs.LogResourceKeysTblName,
telemetryaudit.DBName,
telemetryaudit.LogsTableName,
telemetryaudit.TagAttributesTableName,
telemetryaudit.LogAttributeKeysTblName,
telemetryaudit.LogResourceKeysTblName,
DBName,
AttributesMetadataLocalTableName,
ColumnEvolutionMetadataTableName,

View File

@@ -7,6 +7,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetryaudit"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
@@ -36,6 +37,11 @@ func newTestTelemetryMetaStoreTestHelper(store telemetrystore.TelemetryStore) te
telemetrylogs.TagAttributesV2TableName,
telemetrylogs.LogAttributeKeysTblName,
telemetrylogs.LogResourceKeysTblName,
telemetryaudit.DBName,
telemetryaudit.LogsTableName,
telemetryaudit.TagAttributesTableName,
telemetryaudit.LogAttributeKeysTblName,
telemetryaudit.LogResourceKeysTblName,
DBName,
AttributesMetadataLocalTableName,
ColumnEvolutionMetadataTableName,

View File

@@ -7,11 +7,13 @@ type Source struct {
}
var (
SourceAudit = Source{valuer.NewString("audit")}
SourceMeter = Source{valuer.NewString("meter")}
SourceUnspecified = Source{valuer.NewString("")}
)
// Enum returns the acceptable values for Source.
// TODO: Add SourceAudit once the frontend is ready for consumption.
func (Source) Enum() []any {
return []any{
SourceMeter,

View File

@@ -12,6 +12,7 @@ pytest_plugins = [
"fixtures.sqlite",
"fixtures.zookeeper",
"fixtures.signoz",
"fixtures.audit",
"fixtures.logs",
"fixtures.traces",
"fixtures.metrics",

View File

@@ -0,0 +1,404 @@
import datetime
import json
from abc import ABC
from typing import Any, Callable, Generator, List, Optional
import numpy as np
import pytest
from ksuid import KsuidMs
from fixtures import types
from fixtures.fingerprint import LogsOrTracesFingerprint
class AuditResource(ABC):
labels: str
fingerprint: str
seen_at_ts_bucket_start: np.int64
def __init__(
self,
labels: dict[str, str],
fingerprint: str,
seen_at_ts_bucket_start: np.int64,
) -> None:
self.labels = json.dumps(labels, separators=(",", ":"))
self.fingerprint = fingerprint
self.seen_at_ts_bucket_start = seen_at_ts_bucket_start
def np_arr(self) -> np.array:
return np.array(
[
self.labels,
self.fingerprint,
self.seen_at_ts_bucket_start,
]
)
class AuditResourceOrAttributeKeys(ABC):
name: str
datatype: str
def __init__(self, name: str, datatype: str) -> None:
self.name = name
self.datatype = datatype
def np_arr(self) -> np.array:
return np.array([self.name, self.datatype])
class AuditTagAttributes(ABC):
unix_milli: np.int64
tag_key: str
tag_type: str
tag_data_type: str
string_value: str
int64_value: Optional[np.int64]
float64_value: Optional[np.float64]
def __init__(
self,
timestamp: datetime.datetime,
tag_key: str,
tag_type: str,
tag_data_type: str,
string_value: Optional[str],
int64_value: Optional[np.int64],
float64_value: Optional[np.float64],
) -> None:
self.unix_milli = np.int64(int(timestamp.timestamp() * 1e3))
self.tag_key = tag_key
self.tag_type = tag_type
self.tag_data_type = tag_data_type
self.string_value = string_value or ""
self.int64_value = int64_value
self.float64_value = float64_value
def np_arr(self) -> np.array:
return np.array(
[
self.unix_milli,
self.tag_key,
self.tag_type,
self.tag_data_type,
self.string_value,
self.int64_value,
self.float64_value,
]
)
class AuditLog(ABC):
"""Represents a single audit log event in signoz_audit.
Matches the ClickHouse DDL from the schema migration (ticket #1936):
- Database: signoz_audit
- Local table: logs
- Distributed table: distributed_logs
- No resources_string column (resource JSON only)
- Has event_name column
- 7 materialized columns auto-populated from attributes_string at INSERT time
"""
ts_bucket_start: np.uint64
resource_fingerprint: str
timestamp: np.uint64
observed_timestamp: np.uint64
id: str
trace_id: str
span_id: str
trace_flags: np.uint32
severity_text: str
severity_number: np.uint8
body: str
scope_name: str
scope_version: str
scope_string: dict[str, str]
attributes_string: dict[str, str]
attributes_number: dict[str, np.float64]
attributes_bool: dict[str, bool]
resource_json: dict[str, str]
event_name: str
resource: List[AuditResource]
tag_attributes: List[AuditTagAttributes]
resource_keys: List[AuditResourceOrAttributeKeys]
attribute_keys: List[AuditResourceOrAttributeKeys]
def __init__(
self,
timestamp: Optional[datetime.datetime] = None,
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "",
event_name: str = "",
severity_text: str = "INFO",
trace_id: str = "",
span_id: str = "",
trace_flags: np.uint32 = 0,
scope_name: str = "signoz.audit",
scope_version: str = "",
) -> None:
if timestamp is None:
timestamp = datetime.datetime.now()
self.tag_attributes = []
self.attribute_keys = []
self.resource_keys = []
self.timestamp = np.uint64(int(timestamp.timestamp() * 1e9))
self.observed_timestamp = self.timestamp
minute = timestamp.minute
bucket_minute = 0 if minute < 30 else 30
bucket_start = timestamp.replace(minute=bucket_minute, second=0, microsecond=0)
self.ts_bucket_start = np.uint64(int(bucket_start.timestamp()))
self.id = str(KsuidMs(datetime=timestamp))
self.trace_id = trace_id
self.span_id = span_id
self.trace_flags = trace_flags
self.severity_text = severity_text
self.severity_number = np.uint8(9 if severity_text == "INFO" else 17)
self.body = body
self.event_name = event_name
# Resources — JSON column only (no resources_string in audit DDL)
self.resource_json = {k: str(v) for k, v in resources.items()}
for k, v in self.resource_json.items():
self.tag_attributes.append(
AuditTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="resource",
tag_data_type="string",
string_value=str(v),
int64_value=None,
float64_value=None,
)
)
self.resource_keys.append(
AuditResourceOrAttributeKeys(name=k, datatype="string")
)
self.resource_fingerprint = LogsOrTracesFingerprint(
self.resource_json
).calculate()
# Process attributes by type
self.attributes_string = {}
self.attributes_number = {}
self.attributes_bool = {}
for k, v in attributes.items():
if isinstance(v, bool):
self.attributes_bool[k] = v
self.tag_attributes.append(
AuditTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="bool",
string_value=None,
int64_value=None,
float64_value=None,
)
)
self.attribute_keys.append(
AuditResourceOrAttributeKeys(name=k, datatype="bool")
)
elif isinstance(v, int):
self.attributes_number[k] = np.float64(v)
self.tag_attributes.append(
AuditTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="int64",
string_value=None,
int64_value=np.int64(v),
float64_value=None,
)
)
self.attribute_keys.append(
AuditResourceOrAttributeKeys(name=k, datatype="int64")
)
elif isinstance(v, float):
self.attributes_number[k] = np.float64(v)
self.tag_attributes.append(
AuditTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="float64",
string_value=None,
int64_value=None,
float64_value=np.float64(v),
)
)
self.attribute_keys.append(
AuditResourceOrAttributeKeys(name=k, datatype="float64")
)
else:
self.attributes_string[k] = str(v)
self.tag_attributes.append(
AuditTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="string",
string_value=str(v),
int64_value=None,
float64_value=None,
)
)
self.attribute_keys.append(
AuditResourceOrAttributeKeys(name=k, datatype="string")
)
self.scope_name = scope_name
self.scope_version = scope_version
self.scope_string = {}
self.resource = [
AuditResource(
labels=self.resource_json,
fingerprint=self.resource_fingerprint,
seen_at_ts_bucket_start=self.ts_bucket_start,
)
]
def np_arr(self) -> np.array:
return np.array(
[
self.ts_bucket_start,
self.resource_fingerprint,
self.timestamp,
self.observed_timestamp,
self.id,
self.trace_id,
self.span_id,
self.trace_flags,
self.severity_text,
self.severity_number,
self.body,
self.scope_name,
self.scope_version,
self.scope_string,
self.attributes_string,
self.attributes_number,
self.attributes_bool,
self.resource_json,
self.event_name,
]
)
@pytest.fixture(name="insert_audit_logs", scope="function")
def insert_audit_logs(
clickhouse: types.TestContainerClickhouse,
) -> Generator[Callable[[List[AuditLog]], None], Any, None]:
def _insert_audit_logs(logs: List[AuditLog]) -> None:
resources: List[AuditResource] = []
for log in logs:
resources.extend(log.resource)
if len(resources) > 0:
clickhouse.conn.insert(
database="signoz_audit",
table="distributed_logs_resource",
data=[resource.np_arr() for resource in resources],
column_names=[
"labels",
"fingerprint",
"seen_at_ts_bucket_start",
],
)
tag_attributes: List[AuditTagAttributes] = []
for log in logs:
tag_attributes.extend(log.tag_attributes)
if len(tag_attributes) > 0:
clickhouse.conn.insert(
database="signoz_audit",
table="distributed_tag_attributes",
data=[ta.np_arr() for ta in tag_attributes],
column_names=[
"unix_milli",
"tag_key",
"tag_type",
"tag_data_type",
"string_value",
"int64_value",
"float64_value",
],
)
attribute_keys: List[AuditResourceOrAttributeKeys] = []
for log in logs:
attribute_keys.extend(log.attribute_keys)
if len(attribute_keys) > 0:
clickhouse.conn.insert(
database="signoz_audit",
table="distributed_logs_attribute_keys",
data=[ak.np_arr() for ak in attribute_keys],
column_names=["name", "datatype"],
)
resource_keys: List[AuditResourceOrAttributeKeys] = []
for log in logs:
resource_keys.extend(log.resource_keys)
if len(resource_keys) > 0:
clickhouse.conn.insert(
database="signoz_audit",
table="distributed_logs_resource_keys",
data=[rk.np_arr() for rk in resource_keys],
column_names=["name", "datatype"],
)
clickhouse.conn.insert(
database="signoz_audit",
table="distributed_logs",
data=[log.np_arr() for log in logs],
column_names=[
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"scope_name",
"scope_version",
"scope_string",
"attributes_string",
"attributes_number",
"attributes_bool",
"resource",
"event_name",
],
)
yield _insert_audit_logs
cluster = clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER"]
for table in [
"logs",
"logs_resource",
"tag_attributes",
"logs_attribute_keys",
"logs_resource_keys",
]:
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_audit.{table} ON CLUSTER '{cluster}' SYNC"
)

View File

@@ -0,0 +1,414 @@
"""Integration tests for audit log querying via /api/v5/query_range.
Tests verify that audit events inserted directly into signoz_audit ClickHouse
tables can be queried back through the standard query_range API with
signal=logs, source=audit.
Each test maps to a real user query pattern from the audit logs design doc.
"""
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, List
import requests
from fixtures import types
from fixtures.audit import AuditLog
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.querier import make_query_request
def _build_audit_query(
*,
filter_expression: str = "",
limit: int = 100,
source: str = "audit",
) -> dict:
spec = {
"name": "A",
"signal": "logs",
"source": source,
"disabled": False,
"limit": limit,
"offset": 0,
"order": [
{"key": {"name": "timestamp"}, "direction": "desc"},
{"key": {"name": "id"}, "direction": "desc"},
],
"aggregations": [{"expression": "count()"}],
}
if filter_expression:
spec["filter"] = {"expression": filter_expression}
return {"type": "builder_query", "spec": spec}
def _build_audit_ts_query(
*,
aggregation: str = "count()",
filter_expression: str = "",
group_by: str = "",
step_interval: int = 60,
limit: int = 0,
) -> dict:
spec = {
"name": "A",
"signal": "logs",
"source": "audit",
"stepInterval": step_interval,
"aggregations": [{"expression": aggregation}],
}
if filter_expression:
spec["filter"] = {"expression": filter_expression}
if group_by:
spec["groupBy"] = [{"name": group_by}]
if limit:
spec["limit"] = limit
return {"type": "builder_query", "spec": spec}
def _query_audit_raw(
signoz: types.SigNoz,
token: str,
query: dict,
) -> requests.Response:
now = datetime.now(tz=timezone.utc)
return make_query_request(
signoz,
token,
start_ms=int((now - timedelta(seconds=30)).timestamp() * 1000),
end_ms=int(now.timestamp() * 1000),
queries=[query],
request_type="raw",
)
def _query_audit_ts(
signoz: types.SigNoz,
token: str,
query: dict,
) -> requests.Response:
now = datetime.now(tz=timezone.utc)
return make_query_request(
signoz,
token,
start_ms=int((now - timedelta(minutes=5)).timestamp() * 1000),
end_ms=int(now.timestamp() * 1000),
queries=[query],
request_type="time_series",
)
def _insert_standard_audit_events(
insert_audit_logs: Callable[[List[AuditLog]], None],
) -> None:
"""Insert a representative set of audit events for testing."""
now = datetime.now(tz=timezone.utc)
insert_audit_logs(
[
# Success: admin creates a dashboard
AuditLog(
timestamp=now - timedelta(seconds=5),
resources={"service.name": "signoz", "service.version": "0.90.0"},
attributes={
"signoz.audit.principal.id": "user-001",
"signoz.audit.principal.email": "alice@acme.com",
"signoz.audit.principal.type": "user",
"signoz.audit.principal.org_id": "org-001",
"signoz.audit.action": "create",
"signoz.audit.action_category": "configuration_change",
"signoz.audit.outcome": "success",
"signoz.audit.resource.name": "dashboard",
"signoz.audit.resource.id": "dash-001",
},
body="alice@acme.com (user-001) created dashboard (dash-001)",
event_name="dashboard.created",
severity_text="INFO",
scope_name="signoz.audit",
),
# Success: admin updates a dashboard
AuditLog(
timestamp=now - timedelta(seconds=4),
resources={"service.name": "signoz", "service.version": "0.90.0"},
attributes={
"signoz.audit.principal.id": "user-001",
"signoz.audit.principal.email": "alice@acme.com",
"signoz.audit.principal.type": "user",
"signoz.audit.principal.org_id": "org-001",
"signoz.audit.action": "update",
"signoz.audit.action_category": "configuration_change",
"signoz.audit.outcome": "success",
"signoz.audit.resource.name": "dashboard",
"signoz.audit.resource.id": "dash-001",
},
body="alice@acme.com (user-001) updated dashboard (dash-001)",
event_name="dashboard.updated",
severity_text="INFO",
scope_name="signoz.audit",
),
# Failure: viewer tries to delete a dashboard
AuditLog(
timestamp=now - timedelta(seconds=3),
resources={"service.name": "signoz", "service.version": "0.90.0"},
attributes={
"signoz.audit.principal.id": "user-002",
"signoz.audit.principal.email": "viewer@acme.com",
"signoz.audit.principal.type": "user",
"signoz.audit.principal.org_id": "org-001",
"signoz.audit.action": "delete",
"signoz.audit.action_category": "configuration_change",
"signoz.audit.outcome": "failure",
"signoz.audit.resource.name": "dashboard",
"signoz.audit.resource.id": "dash-001",
"signoz.audit.error.type": "forbidden",
"signoz.audit.error.code": "authz_forbidden",
},
body="viewer@acme.com (user-002) failed to delete dashboard (dash-001): forbidden (authz_forbidden)",
event_name="dashboard.deleted",
severity_text="ERROR",
scope_name="signoz.audit",
),
# Success: service account creates an API key
AuditLog(
timestamp=now - timedelta(seconds=2),
resources={"service.name": "signoz", "service.version": "0.90.0"},
attributes={
"signoz.audit.principal.id": "sa-001",
"signoz.audit.principal.email": "",
"signoz.audit.principal.type": "service_account",
"signoz.audit.principal.org_id": "org-001",
"signoz.audit.action": "create",
"signoz.audit.action_category": "access_control",
"signoz.audit.outcome": "success",
"signoz.audit.resource.name": "serviceaccount",
"signoz.audit.resource.id": "sa-001",
},
body="sa-001 created serviceaccount (sa-001)",
event_name="serviceaccount.apikey.created",
severity_text="INFO",
scope_name="signoz.audit",
),
# Success: admin logs in
AuditLog(
timestamp=now - timedelta(seconds=1),
resources={"service.name": "signoz", "service.version": "0.90.0"},
attributes={
"signoz.audit.principal.id": "user-001",
"signoz.audit.principal.email": "alice@acme.com",
"signoz.audit.principal.type": "user",
"signoz.audit.principal.org_id": "org-001",
"signoz.audit.action": "login",
"signoz.audit.action_category": "access_control",
"signoz.audit.outcome": "success",
"signoz.audit.resource.name": "session",
"signoz.audit.resource.id": "*",
},
body="alice@acme.com (user-001) login session (*)",
event_name="session.login",
severity_text="INFO",
scope_name="signoz.audit",
),
]
)
def test_audit_list_all(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_audit_logs: Callable[[List[AuditLog]], None],
) -> None:
"""List all audit events — verify correct count and ordering."""
_insert_standard_audit_events(insert_audit_logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = _query_audit_raw(signoz, token, _build_audit_query())
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
rows = results[0]["rows"]
assert len(rows) == 5
# Most recent first (session.login)
assert rows[0]["data"]["event_name"] == "session.login"
# Oldest last (dashboard.created)
assert rows[4]["data"]["event_name"] == "dashboard.created"
def test_audit_filter_by_principal(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_audit_logs: Callable[[List[AuditLog]], None],
) -> None:
"""Q1: Show me all actions by a specific user — filter on materialized principal.id."""
_insert_standard_audit_events(insert_audit_logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = _query_audit_raw(
signoz,
token,
_build_audit_query(filter_expression="signoz.audit.principal.id = 'user-001'"),
)
assert response.status_code == HTTPStatus.OK
rows = response.json()["data"]["data"]["results"][0]["rows"]
assert len(rows) == 3 # alice: create, update, login
emails = [
row["data"]["attributes_string"]["signoz.audit.principal.email"] for row in rows
]
assert all(e == "alice@acme.com" for e in emails)
def test_audit_filter_by_outcome_failure(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_audit_logs: Callable[[List[AuditLog]], None],
) -> None:
"""Q2: Show me all failed actions — filter on materialized outcome."""
_insert_standard_audit_events(insert_audit_logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = _query_audit_raw(
signoz,
token,
_build_audit_query(filter_expression="signoz.audit.outcome = 'failure'"),
)
assert response.status_code == HTTPStatus.OK
rows = response.json()["data"]["data"]["results"][0]["rows"]
assert len(rows) == 1
row = rows[0]["data"]
assert row["attributes_string"]["signoz.audit.principal.email"] == "viewer@acme.com"
assert row["attributes_string"]["signoz.audit.action"] == "delete"
assert row["severity_text"] == "ERROR"
def test_audit_filter_by_resource(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_audit_logs: Callable[[List[AuditLog]], None],
) -> None:
"""Q3: Show me the change history of a specific dashboard — filter on resource.name + resource.id."""
_insert_standard_audit_events(insert_audit_logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = _query_audit_raw(
signoz,
token,
_build_audit_query(
filter_expression="signoz.audit.resource.name = 'dashboard' AND signoz.audit.resource.id = 'dash-001'"
),
)
assert response.status_code == HTTPStatus.OK
rows = response.json()["data"]["data"]["results"][0]["rows"]
assert len(rows) == 3 # create, update, failed delete — all on dash-001
actions = [row["data"]["attributes_string"]["signoz.audit.action"] for row in rows]
# Most recent first
assert actions == ["delete", "update", "create"]
def test_audit_filter_by_principal_type(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_audit_logs: Callable[[List[AuditLog]], None],
) -> None:
"""Q5: Show me all actions by service accounts — filter on materialized principal.type."""
_insert_standard_audit_events(insert_audit_logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = _query_audit_raw(
signoz,
token,
_build_audit_query(
filter_expression="signoz.audit.principal.type = 'service_account'"
),
)
assert response.status_code == HTTPStatus.OK
rows = response.json()["data"]["data"]["results"][0]["rows"]
assert len(rows) == 1
assert rows[0]["data"]["event_name"] == "serviceaccount.apikey.created"
def test_audit_scalar_count_failures(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_audit_logs: Callable[[List[AuditLog]], None],
) -> None:
"""Q6: Alert query — count failed actions (scalar aggregation)."""
_insert_standard_audit_events(insert_audit_logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
now = datetime.now(tz=timezone.utc)
response = make_query_request(
signoz,
token,
start_ms=int((now - timedelta(seconds=30)).timestamp() * 1000),
end_ms=int(now.timestamp() * 1000),
queries=[
_build_audit_ts_query(
aggregation="count()",
filter_expression="signoz.audit.outcome = 'failure'",
)
],
request_type="scalar",
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
rows = results[0].get("rows", [])
assert len(rows) == 1
assert rows[0]["data"]["A"] == 1
def test_audit_does_not_leak_into_logs(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_audit_logs: Callable[[List[AuditLog]], None],
) -> None:
"""Audit data in signoz_audit must not appear when querying signal=logs without source=audit."""
_insert_standard_audit_events(insert_audit_logs)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Query regular logs (no source=audit) — should NOT see audit events
response = _query_audit_raw(
signoz,
token,
_build_audit_query(source=""),
)
assert response.status_code == HTTPStatus.OK
rows = response.json()["data"]["data"]["results"][0].get("rows", [])
# None of the audit events should appear in regular log queries
audit_bodies = [
row["data"]["body"]
for row in rows
if "signoz.audit"
in row["data"].get("attributes_string", {}).get("signoz.audit.action", "")
]
assert len(audit_bodies) == 0