mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-03 15:40:34 +01:00
Compare commits
7 Commits
issue_4616
...
promooted-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ae821ad16 | ||
|
|
252330321a | ||
|
|
1bfc20582c | ||
|
|
ece9d45fc5 | ||
|
|
ca6199fbed | ||
|
|
7f0adb5b2b | ||
|
|
65252943a8 |
@@ -38,17 +38,14 @@ func (c *conditionBuilder) conditionFor(
|
||||
return "", err
|
||||
}
|
||||
|
||||
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
|
||||
for _, column := range columns {
|
||||
// TODO(Tushar): thread orgID here to evaluate correctly
|
||||
if column.Type.GetType() == schema.ColumnTypeEnumJSON && key.FieldContext == telemetrytypes.FieldContextBody && c.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})) && key.Name != messageSubField {
|
||||
valueType, value := InferDataType(value, operator, key)
|
||||
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return cond, nil
|
||||
// TODO(Tushar): thread orgID here to evaluate correctly
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody && c.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})) && key.Name != messageSubField {
|
||||
valueType, value := InferDataType(value, operator, key)
|
||||
cond, err := NewJSONConditionBuilder(key, valueType, logsV2Columns[LogsV2BodyV2Column]).buildJSONCondition(startNs, endNs, columns, operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return cond, nil
|
||||
}
|
||||
|
||||
if operator.IsStringSearchOperator() {
|
||||
|
||||
@@ -69,14 +69,15 @@ var (
|
||||
)
|
||||
|
||||
type fieldMapper struct {
|
||||
fl flagger.Flagger
|
||||
fl flagger.Flagger
|
||||
jsonMapper *JSONFieldMapper
|
||||
}
|
||||
|
||||
func NewFieldMapper(fl flagger.Flagger) qbtypes.FieldMapper {
|
||||
return &fieldMapper{fl: fl}
|
||||
return &fieldMapper{fl: fl, jsonMapper: NewJSONFieldMapper()}
|
||||
}
|
||||
|
||||
func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
func (m *fieldMapper) getColumns(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
columns := []*schema.Column{logsV2Columns["resources_string"], logsV2Columns["resource"]}
|
||||
@@ -105,7 +106,7 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
|
||||
if key.Name == messageSubField {
|
||||
return []*schema.Column{logsV2Columns[messageSubColumn]}, nil
|
||||
}
|
||||
return []*schema.Column{logsV2Columns[LogsV2BodyV2Column]}, nil
|
||||
return []*schema.Column{logsV2Columns[LogsV2BodyV2Column], logsV2Columns[LogsV2BodyPromotedColumn]}, nil
|
||||
}
|
||||
// Fall back to legacy body column
|
||||
return []*schema.Column{logsV2Columns["body"]}, nil
|
||||
@@ -118,13 +119,9 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
|
||||
if !ok {
|
||||
// check if the key has body JSON search
|
||||
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
|
||||
// Use body_v2 if feature flag is enabled and we have a body condition builder
|
||||
// TODO(Tushar): thread orgID here to evaluate correctly
|
||||
if m.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})) {
|
||||
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
|
||||
// i.e return both the body json and body json promoted and let the evolutions decide which one to use
|
||||
// based on the query range time.
|
||||
return []*schema.Column{logsV2Columns[LogsV2BodyV2Column]}, nil
|
||||
return []*schema.Column{logsV2Columns[LogsV2BodyV2Column], logsV2Columns[LogsV2BodyPromotedColumn]}, nil
|
||||
}
|
||||
// Fall back to legacy body column
|
||||
return []*schema.Column{logsV2Columns["body"]}, nil
|
||||
@@ -145,12 +142,15 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
|
||||
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
|
||||
// - Results are sorted by ReleaseTime descending (newest first)
|
||||
func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
|
||||
|
||||
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
|
||||
copy(sortedEvolutions, evolutions)
|
||||
|
||||
// sort the evolutions by ReleaseTime ascending
|
||||
// Sort by ReleaseTime ascending; break ties by Version ascending so that at the
|
||||
// same timestamp the highest-versioned entry is last and becomes the base anchor.
|
||||
sort.Slice(sortedEvolutions, func(i, j int) bool {
|
||||
if sortedEvolutions[i].ReleaseTime.Equal(sortedEvolutions[j].ReleaseTime) {
|
||||
return sortedEvolutions[i].Version < sortedEvolutions[j].Version
|
||||
}
|
||||
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
|
||||
})
|
||||
|
||||
@@ -199,6 +199,12 @@ func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetr
|
||||
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
|
||||
continue
|
||||
}
|
||||
// At the same timestamp, a lower-versioned entry is superseded by the higher-versioned
|
||||
// base (e.g. body v0 is superseded by body_v2 v1 at epoch).
|
||||
if evolution.ReleaseTime.Equal(latestBaseEvolutionAcrossAll.ReleaseTime) &&
|
||||
evolution.Version < latestBaseEvolutionAcrossAll.Version {
|
||||
continue
|
||||
}
|
||||
// skip evolutions after tsEndTime
|
||||
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
|
||||
continue
|
||||
@@ -245,7 +251,7 @@ func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetr
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
columns, err := m.getColumn(ctx, key)
|
||||
columns, err := m.getColumns(ctx, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -287,12 +293,18 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
|
||||
return "", qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
expr, err := m.buildFieldForJSON(key)
|
||||
// Use `column` from the outer newColumns loop (already evolution-filtered).
|
||||
expr, err := m.jsonMapper.FieldExprFor(key, column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
exprs = append(exprs, expr)
|
||||
|
||||
exist, err := m.jsonMapper.ExistExprFor(key, column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
existExpr = append(existExpr, exist)
|
||||
default:
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
@@ -348,7 +360,7 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
return m.getColumn(ctx, key)
|
||||
return m.getColumns(ctx, key)
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnExpressionFor(
|
||||
@@ -401,145 +413,3 @@ func (m *fieldMapper) ColumnExpressionFor(
|
||||
|
||||
return fmt.Sprintf("%s AS `%s`", sqlbuilder.Escape(fieldExpression), field.Name), nil
|
||||
}
|
||||
|
||||
// buildFieldForJSON builds the field expression for body JSON fields using arrayConcat pattern.
|
||||
func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
plan := key.JSONPlan
|
||||
if len(plan) == 0 {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
|
||||
"Could not find any valid paths for: %s", key.Name)
|
||||
}
|
||||
|
||||
if plan[0].IsTerminal {
|
||||
node := plan[0]
|
||||
|
||||
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
|
||||
// TODO(Piyush): Promoted path logic commented out. Materialized now means type hint
|
||||
// promotion will be extracted from key field evolution
|
||||
// (direct sub-column access), not a promoted body_promoted.* column.
|
||||
// if key.Materialized {
|
||||
// if len(plan) < 2 {
|
||||
// return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
|
||||
// "plan length is less than 2 for promoted path: %s", key.Name)
|
||||
// }
|
||||
|
||||
// node := plan[1]
|
||||
// promotedExpr := fmt.Sprintf(
|
||||
// "dynamicElement(%s, '%s')",
|
||||
// node.FieldPath(),
|
||||
// node.TerminalConfig.ElemType.StringValue(),
|
||||
// )
|
||||
|
||||
// // dynamicElement returns NULL for scalar types or an empty array for array types.
|
||||
// if node.TerminalConfig.ElemType.IsArray {
|
||||
// expr = fmt.Sprintf(
|
||||
// "if(length(%s) > 0, %s, %s)",
|
||||
// promotedExpr,
|
||||
// promotedExpr,
|
||||
// expr,
|
||||
// )
|
||||
// } else {
|
||||
// // promoted column first then body_json column
|
||||
// // TODO(Piyush): Change this in future for better performance
|
||||
// expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
return expr, nil
|
||||
}
|
||||
|
||||
// Build arrayConcat pattern directly from the tree structure
|
||||
arrayConcatExpr, err := m.buildArrayConcat(plan)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return arrayConcatExpr, nil
|
||||
}
|
||||
|
||||
// buildArrayConcat builds the arrayConcat pattern directly from the tree structure.
|
||||
func (m *fieldMapper) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) {
|
||||
if len(plan) == 0 {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat")
|
||||
}
|
||||
|
||||
// Build arrayMap expressions for ALL available branches at the root level.
|
||||
// Iterate branches in deterministic order (JSON then Dynamic)
|
||||
var arrayMapExpressions []string
|
||||
for _, node := range plan {
|
||||
for _, branchType := range node.BranchesInOrder() {
|
||||
expr, err := m.buildArrayMap(node, branchType)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
arrayMapExpressions = append(arrayMapExpressions, expr)
|
||||
}
|
||||
}
|
||||
if len(arrayMapExpressions) == 0 {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
|
||||
}
|
||||
|
||||
// Build the arrayConcat expression
|
||||
arrayConcatExpr := fmt.Sprintf("arrayConcat(%s)", strings.Join(arrayMapExpressions, ", "))
|
||||
|
||||
// Wrap with arrayFlatten
|
||||
arrayFlattenExpr := fmt.Sprintf("arrayFlatten(%s)", arrayConcatExpr)
|
||||
|
||||
return arrayFlattenExpr, nil
|
||||
}
|
||||
|
||||
// buildArrayMap builds the arrayMap expression for a specific branch, handling all sub-branches.
|
||||
func (m *fieldMapper) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) {
|
||||
if currentNode == nil {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap")
|
||||
}
|
||||
|
||||
childNode := currentNode.Branches[branchType]
|
||||
if childNode == nil {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeChildNodeNil, "child node is nil while building arrayMap")
|
||||
}
|
||||
|
||||
// Build the array expression for this level
|
||||
var arrayExpr string
|
||||
if branchType == telemetrytypes.BranchJSON {
|
||||
// Array(JSON) branch
|
||||
arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')",
|
||||
currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths)
|
||||
} else {
|
||||
// Array(Dynamic) branch - filter for JSON objects
|
||||
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath())
|
||||
arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
|
||||
}
|
||||
|
||||
// If this is the terminal level, return the simple arrayMap
|
||||
if childNode.IsTerminal {
|
||||
dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", childNode.FieldPath(),
|
||||
childNode.TerminalConfig.ElemType.StringValue(),
|
||||
)
|
||||
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil
|
||||
}
|
||||
|
||||
// For non-terminal nodes, we need to handle ALL possible branches at the next level.
|
||||
// Use deterministic branch order so generated SQL is stable across environments.
|
||||
var nestedExpressions []string
|
||||
for _, branchType := range childNode.BranchesInOrder() {
|
||||
expr, err := m.buildArrayMap(childNode, branchType)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
nestedExpressions = append(nestedExpressions, expr)
|
||||
}
|
||||
|
||||
// If we have multiple nested expressions, we need to concat them
|
||||
var nestedExpr string
|
||||
if len(nestedExpressions) == 1 {
|
||||
nestedExpr = nestedExpressions[0]
|
||||
} else if len(nestedExpressions) > 1 {
|
||||
nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExpressions, ", "))
|
||||
} else {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap")
|
||||
}
|
||||
|
||||
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil
|
||||
}
|
||||
|
||||
@@ -23,42 +23,73 @@ var (
|
||||
CodeArrayNavigationFailed = errors.MustNewCode("array_navigation_failed")
|
||||
)
|
||||
|
||||
// JSON body fields are stored across two columns:
|
||||
// - Base column (body_v2): present from the start, always carries a value,
|
||||
// and is the only column with skip-indexes (e.g. JSON paths bloom filter).
|
||||
// - Promoted column (body_promoted): introduced later via a schema evolution;
|
||||
// its field set is a subset of the base column at the time of promotion.
|
||||
type jsonConditionBuilder struct {
|
||||
key *telemetrytypes.TelemetryFieldKey
|
||||
valueType telemetrytypes.JSONDataType
|
||||
key *telemetrytypes.TelemetryFieldKey
|
||||
valueType telemetrytypes.JSONDataType
|
||||
baseColumn *schemamigrator.Column
|
||||
}
|
||||
|
||||
func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType telemetrytypes.FieldDataType) *jsonConditionBuilder {
|
||||
return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType]}
|
||||
func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType telemetrytypes.FieldDataType, baseColumn *schemamigrator.Column) *jsonConditionBuilder {
|
||||
return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType], baseColumn: baseColumn}
|
||||
}
|
||||
|
||||
// BuildCondition builds the full WHERE condition for body_v2 JSON paths.
|
||||
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
baseCond, err := c.emitPlannedCondition(operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// path index
|
||||
if operator.AddDefaultExistsFilter() {
|
||||
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
|
||||
return sb.And(baseCond, pathIndex), nil
|
||||
}
|
||||
|
||||
return baseCond, nil
|
||||
}
|
||||
|
||||
func (c *jsonConditionBuilder) emitPlannedCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
// Build traversal + terminal recursively per-hop
|
||||
conditions := []string{}
|
||||
for _, node := range c.key.JSONPlan {
|
||||
condition, err := c.recurseArrayHops(node, operator, value, sb)
|
||||
func (c *jsonConditionBuilder) buildJSONCondition(startNs, endNs uint64, columns []*schemamigrator.Column, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
if len(c.key.Evolutions) > 0 {
|
||||
filtered, _, err := selectEvolutionsForColumns(columns, c.key.Evolutions, startNs, endNs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conditions = append(conditions, condition)
|
||||
columns = filtered
|
||||
}
|
||||
|
||||
columnConditions := make([]string, 0, len(columns))
|
||||
for _, col := range columns {
|
||||
cond, err := c.emitPlannedCondition(col, operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
columnConditions = append(columnConditions, cond)
|
||||
}
|
||||
|
||||
var baseCondition string
|
||||
if len(columnConditions) == 1 {
|
||||
baseCondition = columnConditions[0]
|
||||
} else {
|
||||
baseCondition = sb.Or(columnConditions...)
|
||||
}
|
||||
|
||||
// path index is only available on the base column
|
||||
if operator.AddDefaultExistsFilter() {
|
||||
return sb.And(baseCondition, fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(c.baseColumn.Name), c.key.ArrayParentPaths()[0])), nil
|
||||
}
|
||||
|
||||
return baseCondition, nil
|
||||
}
|
||||
|
||||
func (c *jsonConditionBuilder) emitPlannedCondition(column *schemamigrator.Column, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
if c.key.PlanBuilder == nil {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "no plan builder for key %s", c.key.Name)
|
||||
}
|
||||
|
||||
var conditions []string
|
||||
|
||||
node, err := c.key.PlanBuilder.Build(column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
cond, err := c.recurseArrayHops(node, operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conditions = append(conditions, cond)
|
||||
|
||||
return sb.Or(conditions...), nil
|
||||
}
|
||||
|
||||
|
||||
114
pkg/telemetrylogs/json_field_mapper.go
Normal file
114
pkg/telemetrylogs/json_field_mapper.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package telemetrylogs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
type JSONFieldMapper struct{}
|
||||
|
||||
func NewJSONFieldMapper() *JSONFieldMapper { return &JSONFieldMapper{} }
|
||||
|
||||
// FieldExprFor returns the ClickHouse expression that reads the field value
|
||||
// from the given JSON column — a dynamicElement(...) scalar for primitives or
|
||||
// an arrayFlatten(arrayConcat(...)) for array paths.
|
||||
func (j *JSONFieldMapper) FieldExprFor(key *telemetrytypes.TelemetryFieldKey, column *schemamigrator.Column) (string, error) {
|
||||
node, err := key.PlanBuilder.Build(column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if node.IsTerminal {
|
||||
return fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue()), nil
|
||||
}
|
||||
return j.buildArrayConcat(node)
|
||||
}
|
||||
|
||||
// ExistExprFor returns a boolean ClickHouse expression that evaluates to true
|
||||
// when the field is present in the given column. Used as the dispatch predicate
|
||||
// inside multiIf when multiple columns are active for a time range.
|
||||
func (j *JSONFieldMapper) ExistExprFor(key *telemetrytypes.TelemetryFieldKey, column *schemamigrator.Column) (string, error) {
|
||||
node, err := key.PlanBuilder.Build(column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if node.IsTerminal {
|
||||
dynamicExpr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
|
||||
if node.TerminalConfig.ElemType.IsArray {
|
||||
return fmt.Sprintf("length(%s) > 0", dynamicExpr), nil
|
||||
}
|
||||
return fmt.Sprintf("%s IS NOT NULL", dynamicExpr), nil
|
||||
}
|
||||
arrayConcatExpr, err := j.buildArrayConcat(node)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("length(%s) > 0", arrayConcatExpr), nil
|
||||
}
|
||||
|
||||
// buildArrayConcat produces an arrayFlatten(arrayConcat(...)) expression that
|
||||
// collects every matching element across all JSON / Dynamic branches.
|
||||
func (j *JSONFieldMapper) buildArrayConcat(node *telemetrytypes.JSONAccessNode) (string, error) {
|
||||
var arrayMapExprs []string
|
||||
for _, branchType := range node.BranchesInOrder() {
|
||||
expr, err := j.buildArrayMap(node, branchType)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
arrayMapExprs = append(arrayMapExprs, expr)
|
||||
}
|
||||
if len(arrayMapExprs) == 0 {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
|
||||
}
|
||||
return fmt.Sprintf("arrayFlatten(arrayConcat(%s))", strings.Join(arrayMapExprs, ", ")), nil
|
||||
}
|
||||
|
||||
// buildArrayMap builds a single arrayMap expression for one branch (JSON or
|
||||
// Dynamic) at the current node level, recursing into deeper levels as needed.
|
||||
func (j *JSONFieldMapper) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) {
|
||||
if currentNode == nil {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap")
|
||||
}
|
||||
childNode := currentNode.Branches[branchType]
|
||||
if childNode == nil {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeChildNodeNil, "child node is nil while building arrayMap")
|
||||
}
|
||||
|
||||
var arrayExpr string
|
||||
if branchType == telemetrytypes.BranchJSON {
|
||||
arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')",
|
||||
currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths)
|
||||
} else {
|
||||
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath())
|
||||
arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
|
||||
}
|
||||
|
||||
if childNode.IsTerminal {
|
||||
dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", childNode.FieldPath(), childNode.TerminalConfig.ElemType.StringValue())
|
||||
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil
|
||||
}
|
||||
|
||||
var nestedExprs []string
|
||||
for _, bt := range childNode.BranchesInOrder() {
|
||||
expr, err := j.buildArrayMap(childNode, bt)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
nestedExprs = append(nestedExprs, expr)
|
||||
}
|
||||
|
||||
var nestedExpr string
|
||||
switch len(nestedExprs) {
|
||||
case 0:
|
||||
return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap")
|
||||
case 1:
|
||||
nestedExpr = nestedExprs[0]
|
||||
default:
|
||||
nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExprs, ", "))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
@@ -85,27 +85,16 @@ func (t *telemetryMetaStore) enrichJSONKeys(ctx context.Context, selectors []*te
|
||||
key.Indexes = indexes[key.Name]
|
||||
}
|
||||
|
||||
// build JSON access plans using the pre-fetched parent type cache
|
||||
return t.buildJSONPlans(filteredKeys, parentTypeCache)
|
||||
}
|
||||
|
||||
// buildJSONPlans builds JSON access plans for the given keys
|
||||
// using the provided parent type cache (pre-fetched in the main UNION query).
|
||||
func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFieldKey, typeCache map[string][]telemetrytypes.FieldDataType) error {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// attach a lazy PlanBuilder so field_mapper can build plans on demand
|
||||
columnMeta := t.jsonColumnMetadata[telemetrytypes.SignalLogs][telemetrytypes.FieldContextBody]
|
||||
for _, key := range keys {
|
||||
if err := key.SetJSONAccessPlan(columnMeta, typeCache); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, key := range filteredKeys {
|
||||
key.PlanBuilder = telemetrytypes.NewFieldPlanBuilder(key, columnMeta, parentTypeCache)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
|
||||
filteredPaths := []string{}
|
||||
for _, path := range paths {
|
||||
|
||||
@@ -37,7 +37,7 @@ type TelemetryFieldKey struct {
|
||||
FieldContext FieldContext `json:"fieldContext,omitzero"`
|
||||
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
|
||||
|
||||
JSONPlan JSONAccessPlan `json:"-"`
|
||||
PlanBuilder *FieldPlanBuilder `json:"-"`
|
||||
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
|
||||
@@ -120,7 +120,7 @@ func (f *TelemetryFieldKey) OverrideMetadataFrom(src *TelemetryFieldKey) {
|
||||
f.FieldDataType = src.FieldDataType
|
||||
f.Indexes = src.Indexes
|
||||
f.Materialized = src.Materialized
|
||||
f.JSONPlan = src.JSONPlan
|
||||
f.PlanBuilder = src.PlanBuilder
|
||||
f.Evolutions = src.Evolutions
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
@@ -28,8 +29,6 @@ type JSONColumnMetadata struct {
|
||||
PromotedColumn string
|
||||
}
|
||||
|
||||
type JSONAccessPlan = []*JSONAccessNode
|
||||
|
||||
type TerminalConfig struct {
|
||||
Key *TelemetryFieldKey
|
||||
ElemType JSONDataType
|
||||
@@ -99,23 +98,48 @@ func (n *JSONAccessNode) BranchesInOrder() []JSONAccessBranchType {
|
||||
})
|
||||
}
|
||||
|
||||
type planBuilder struct {
|
||||
// FieldPlanBuilder builds JSON access nodes on demand for a specific field key.
|
||||
// It holds all necessary metadata (key, column info, type cache) and produces
|
||||
// the correct root node per column, dispatching between base and promoted plans
|
||||
// by column name — without any external "is promoted?" flag.
|
||||
type FieldPlanBuilder struct {
|
||||
key *TelemetryFieldKey
|
||||
columnInfo JSONColumnMetadata
|
||||
typeCache map[string][]FieldDataType
|
||||
paths []string // cumulative paths for type cache lookups
|
||||
segments []string // individual path segments for node names
|
||||
isPromoted bool
|
||||
typeCache map[string][]FieldDataType
|
||||
}
|
||||
|
||||
func NewFieldPlanBuilder(key *TelemetryFieldKey, info JSONColumnMetadata, typeCache map[string][]FieldDataType) *FieldPlanBuilder {
|
||||
return &FieldPlanBuilder{key: key, columnInfo: info, typeCache: typeCache}
|
||||
}
|
||||
|
||||
// Build dispatches by column name — called by JSONConditionBuilder in field_mapper.
|
||||
func (b *FieldPlanBuilder) Build(column *schemamigrator.Column) (*JSONAccessNode, error) {
|
||||
if column.Name == b.columnInfo.BaseColumn {
|
||||
return b.build(NewRootJSONAccessNode(b.columnInfo.BaseColumn, 32, 0))
|
||||
}
|
||||
return b.build(NewRootJSONAccessNode(b.columnInfo.PromotedColumn, 32, 1024))
|
||||
}
|
||||
|
||||
func (b *FieldPlanBuilder) build(root *JSONAccessNode) (*JSONAccessNode, error) {
|
||||
if b.key.Name == "" {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
|
||||
}
|
||||
b.paths = b.key.ArrayParentPaths()
|
||||
b.segments = b.key.ArrayPathSegments()
|
||||
return b.buildPlan(0, root, false)
|
||||
}
|
||||
|
||||
// buildPlan recursively builds the path plan tree.
|
||||
func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
|
||||
if index >= len(pb.paths) {
|
||||
func (b *FieldPlanBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
|
||||
if index >= len(b.paths) {
|
||||
return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds")
|
||||
}
|
||||
|
||||
pathSoFar := pb.paths[index] // cumulative path for type cache lookup
|
||||
segmentName := pb.segments[index] // segment name for node
|
||||
isTerminal := index == len(pb.paths)-1
|
||||
pathSoFar := b.paths[index] // cumulative path for type cache lookup
|
||||
segmentName := b.segments[index] // segment name for node
|
||||
isTerminal := index == len(b.paths)-1
|
||||
|
||||
// Calculate progression parameters based on parent's values
|
||||
var maxTypes, maxPaths int
|
||||
@@ -149,18 +173,18 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
|
||||
// Configure terminal if this is the last part
|
||||
if isTerminal {
|
||||
// fielddatatype must not be unspecified else expression can not be generated
|
||||
if pb.key.FieldDataType == FieldDataTypeUnspecified {
|
||||
if b.key.FieldDataType == FieldDataTypeUnspecified {
|
||||
return nil, errors.NewInternalf(CodePlanFieldDataTypeMissing, "field data type is missing for path %s", pathSoFar)
|
||||
}
|
||||
|
||||
node.TerminalConfig = &TerminalConfig{
|
||||
Key: pb.key,
|
||||
ElemType: pb.key.GetJSONDataType(),
|
||||
Key: b.key,
|
||||
ElemType: b.key.GetJSONDataType(),
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
// Use cached types from the batched metadata query
|
||||
types, ok := pb.typeCache[pathSoFar]
|
||||
types, ok := b.typeCache[pathSoFar]
|
||||
if !ok {
|
||||
return nil, errors.NewInternalf(errors.CodeInvalidInput, "types missing for path %s", pathSoFar)
|
||||
}
|
||||
@@ -172,13 +196,13 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
|
||||
}
|
||||
|
||||
if hasJSON {
|
||||
node.Branches[BranchJSON], err = pb.buildPlan(index+1, node, false)
|
||||
node.Branches[BranchJSON], err = b.buildPlan(index+1, node, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if hasDynamic {
|
||||
node.Branches[BranchDynamic], err = pb.buildPlan(index+1, node, true)
|
||||
node.Branches[BranchDynamic], err = b.buildPlan(index+1, node, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -187,45 +211,3 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
|
||||
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// buildJSONAccessPlan builds a tree structure representing the complete JSON path traversal
|
||||
// that precomputes all possible branches and their types.
|
||||
func (key *TelemetryFieldKey) SetJSONAccessPlan(columnInfo JSONColumnMetadata, typeCache map[string][]FieldDataType,
|
||||
) error {
|
||||
// if path is empty, return nil
|
||||
if key.Name == "" {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
|
||||
}
|
||||
|
||||
pb := &planBuilder{
|
||||
key: key,
|
||||
paths: key.ArrayParentPaths(),
|
||||
segments: key.ArrayPathSegments(),
|
||||
isPromoted: key.Materialized,
|
||||
typeCache: typeCache,
|
||||
}
|
||||
|
||||
node, err := pb.buildPlan(0,
|
||||
NewRootJSONAccessNode(columnInfo.BaseColumn,
|
||||
32, 0),
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key.JSONPlan = append(key.JSONPlan, node)
|
||||
|
||||
if pb.isPromoted {
|
||||
node, err := pb.buildPlan(0,
|
||||
NewRootJSONAccessNode(columnInfo.PromotedColumn,
|
||||
32, 1024),
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key.JSONPlan = append(key.JSONPlan, node)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -80,6 +80,29 @@ func toTestNode(n *JSONAccessNode) *jsonAccessTestNode {
|
||||
return out
|
||||
}
|
||||
|
||||
// buildTestPlans sets PlanBuilder on the key and returns the pre-built access plans.
|
||||
// Non-materialized keys produce one plan (base column); materialized keys produce two (base + promoted).
|
||||
func buildTestPlans(t *testing.T, key *TelemetryFieldKey, meta JSONColumnMetadata, types map[string][]FieldDataType) ([]*JSONAccessNode, error) {
|
||||
t.Helper()
|
||||
|
||||
key.PlanBuilder = NewFieldPlanBuilder(key, meta, types)
|
||||
|
||||
basePlan, err := key.PlanBuilder.build(NewRootJSONAccessNode(meta.BaseColumn, 32, 0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
plans := []*JSONAccessNode{basePlan}
|
||||
|
||||
if key.Materialized {
|
||||
promotedPlan, err := key.PlanBuilder.build(NewRootJSONAccessNode(meta.PromotedColumn, 32, 1024))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
plans = append(plans, promotedPlan)
|
||||
}
|
||||
return plans, nil
|
||||
}
|
||||
|
||||
// plansToYAML converts a slice of JSONAccessNode plans to a YAML string that
|
||||
// can be compared against a per-test expectedTree.
|
||||
func plansToYAML(t *testing.T, plans []*JSONAccessNode) string {
|
||||
@@ -267,19 +290,17 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
meta := JSONColumnMetadata{BaseColumn: bodyV2Column, PromotedColumn: bodyPromotedColumn}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
plans, err := buildTestPlans(t, tt.key, meta, types)
|
||||
if tt.expectErr {
|
||||
require.Error(t, err)
|
||||
require.Nil(t, tt.key.JSONPlan)
|
||||
require.Nil(t, plans)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
got := plansToYAML(t, tt.key.JSONPlan)
|
||||
got := plansToYAML(t, plans)
|
||||
require.YAMLEq(t, tt.expectedYAML, got)
|
||||
})
|
||||
}
|
||||
@@ -383,17 +404,15 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
meta := JSONColumnMetadata{BaseColumn: bodyV2Column, PromotedColumn: bodyPromotedColumn}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
key := makeKey(tt.path, FieldDataTypeString, false)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
plans, err := buildTestPlans(t, key, meta, types)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, key.JSONPlan)
|
||||
require.Len(t, key.JSONPlan, 1)
|
||||
got := plansToYAML(t, key.JSONPlan)
|
||||
require.NotNil(t, plans)
|
||||
require.Len(t, plans, 1)
|
||||
got := plansToYAML(t, plans)
|
||||
require.YAMLEq(t, tt.expectedYAML, got)
|
||||
})
|
||||
}
|
||||
@@ -403,14 +422,13 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
|
||||
types, _ := TestJSONTypeSet()
|
||||
path := "education[].awards[].type"
|
||||
|
||||
meta := JSONColumnMetadata{BaseColumn: bodyV2Column, PromotedColumn: bodyPromotedColumn}
|
||||
|
||||
t.Run("Non-promoted plan", func(t *testing.T) {
|
||||
key := makeKey(path, FieldDataTypeString, false)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
plans, err := buildTestPlans(t, key, meta, types)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, key.JSONPlan, 1)
|
||||
require.Len(t, plans, 1)
|
||||
|
||||
expectedYAML := fmt.Sprintf(`
|
||||
- name: education
|
||||
@@ -433,18 +451,15 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`, bodyV2Column)
|
||||
got := plansToYAML(t, key.JSONPlan)
|
||||
got := plansToYAML(t, plans)
|
||||
require.YAMLEq(t, expectedYAML, got)
|
||||
})
|
||||
|
||||
t.Run("Promoted plan", func(t *testing.T) {
|
||||
key := makeKey(path, FieldDataTypeString, true)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
plans, err := buildTestPlans(t, key, meta, types)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, key.JSONPlan, 2)
|
||||
require.Len(t, plans, 2)
|
||||
|
||||
expectedYAML := fmt.Sprintf(`
|
||||
- name: education
|
||||
@@ -489,7 +504,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`, bodyV2Column, bodyPromotedColumn)
|
||||
got := plansToYAML(t, key.JSONPlan)
|
||||
got := plansToYAML(t, plans)
|
||||
require.YAMLEq(t, expectedYAML, got)
|
||||
})
|
||||
}
|
||||
@@ -580,7 +595,7 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
|
||||
keyType = FieldDataTypeString
|
||||
}
|
||||
key := makeKey(tt.path, keyType, false)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
plans, err := buildTestPlans(t, key, JSONColumnMetadata{
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
@@ -589,7 +604,7 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
got := plansToYAML(t, key.JSONPlan)
|
||||
got := plansToYAML(t, plans)
|
||||
require.YAMLEq(t, tt.expectedYAML, got)
|
||||
})
|
||||
}
|
||||
@@ -599,12 +614,12 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
|
||||
types, _ := TestJSONTypeSet()
|
||||
path := "education[].awards[].participated[].team[].branch"
|
||||
key := makeKey(path, FieldDataTypeString, false)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
plans, err := buildTestPlans(t, key, JSONColumnMetadata{
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, key.JSONPlan, 1)
|
||||
require.Len(t, plans, 1)
|
||||
|
||||
expectedYAML := fmt.Sprintf(`
|
||||
- name: education
|
||||
@@ -668,6 +683,6 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
|
||||
elemType: String
|
||||
`, bodyV2Column)
|
||||
|
||||
got := plansToYAML(t, key.JSONPlan)
|
||||
got := plansToYAML(t, plans)
|
||||
require.YAMLEq(t, expectedYAML, got)
|
||||
}
|
||||
|
||||
@@ -61,6 +61,9 @@ func TestJSONTypeSet() (map[string][]FieldDataType, MetadataStore) {
|
||||
"http-events": {FieldDataTypeArrayJSON},
|
||||
"http-events[].request-info.host": {FieldDataTypeString},
|
||||
|
||||
// ── top-level arrays ─────────────────────────────────────────────
|
||||
"tags": {FieldDataTypeArrayString},
|
||||
|
||||
// ── top-level primitives ──────────────────────────────────────────
|
||||
"message": {FieldDataTypeString},
|
||||
"http-status": {FieldDataTypeInt64, FieldDataTypeString}, // hyphen in root key, ambiguous
|
||||
|
||||
43
tests/fixtures/jsontypes.py
vendored
43
tests/fixtures/jsontypes.py
vendored
@@ -373,6 +373,49 @@ def export_json_types(
|
||||
clickhouse.conn.query(f"TRUNCATE TABLE signoz_metadata.field_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC")
|
||||
|
||||
|
||||
@pytest.fixture(name="export_promoted_paths", scope="function")
|
||||
def export_promoted_paths(
|
||||
clickhouse: types.TestContainerClickhouse,
|
||||
) -> Generator[Callable[[list[str], datetime.datetime], None], Any]:
|
||||
"""
|
||||
Seeds signoz_metadata.distributed_column_evolution_metadata with promoted-path
|
||||
entries so the query builder treats body_promoted as active from promotion_ts onward.
|
||||
|
||||
The base evolutions for body (version=0) and body_v2 (version=1) with
|
||||
field_name='__all__' must already exist from schema migrations.
|
||||
|
||||
Teardown removes all body_promoted entries inserted during the test.
|
||||
|
||||
Args:
|
||||
paths: Field names to promote (e.g. ["user.name", "user.age", "education"]).
|
||||
promotion_ts: Datetime from which body_promoted is active.
|
||||
"""
|
||||
inserted: list[str] = []
|
||||
|
||||
def _export_promoted_paths(paths: list[str], promotion_ts: datetime.datetime) -> None:
|
||||
# release_time is stored as Float64 nanoseconds in a SimpleAggregateFunction(min, Float64)
|
||||
release_ns = float(int(promotion_ts.timestamp() * 1e9))
|
||||
data = [
|
||||
["logs", "body_promoted", "JSON()", "body", path, np.uint32(2), release_ns]
|
||||
for path in paths
|
||||
]
|
||||
clickhouse.conn.insert(
|
||||
database="signoz_metadata",
|
||||
table="distributed_column_evolution_metadata",
|
||||
data=data,
|
||||
column_names=["signal", "column_name", "column_type", "field_context", "field_name", "version", "release_time"],
|
||||
)
|
||||
inserted.extend(paths)
|
||||
|
||||
yield _export_promoted_paths
|
||||
|
||||
if inserted:
|
||||
cluster = clickhouse.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER"]
|
||||
clickhouse.conn.query(
|
||||
f"TRUNCATE TABLE signoz_metadata.column_evolution_metadata ON CLUSTER '{cluster}' SYNC"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="create_json_index", scope="function")
|
||||
def create_json_index(
|
||||
signoz: types.SigNoz,
|
||||
|
||||
501
tests/integration/tests/querier_json_body/03_body_promoted.py
Normal file
501
tests/integration/tests/querier_json_body/03_body_promoted.py
Normal file
@@ -0,0 +1,501 @@
|
||||
"""
|
||||
Integration tests for promoted-body-path time-window semantics.
|
||||
|
||||
Promotion splits the data timeline at a known timestamp. The query builder
|
||||
chooses which JSON column(s) to reference based on the query window:
|
||||
|
||||
beforePromoted : window entirely before promotion_ts → body_v2 only
|
||||
afterPromoted : window entirely after promotion_ts → body_promoted only
|
||||
inBetween : window spanning the promotion_ts → both (multiIf merge)
|
||||
|
||||
Every case validates TWO things:
|
||||
1. Data correctness — right rows are returned for the window (validate lambda)
|
||||
2. SQL column shape — ClickHouse query references the expected column(s)
|
||||
(check_query_log, inspects system.query_log)
|
||||
"""
|
||||
import json
|
||||
from collections.abc import Callable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.logs import Logs
|
||||
from fixtures.querier import (
|
||||
build_logs_aggregation,
|
||||
build_order_by,
|
||||
build_raw_query,
|
||||
build_scalar_query,
|
||||
get_rows,
|
||||
get_scalar_table_data,
|
||||
make_query_request,
|
||||
)
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Module-level constants
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
_BODY_V2 = "body_v2"
|
||||
_BODY_PROMOTED = "body_promoted"
|
||||
|
||||
# Paths fed to export_promoted_paths for the list-query test
|
||||
_PROMOTED_PATHS = ["user.name", "user.age", "education"]
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Shared helpers
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _get_bodies(response: Any) -> list[dict[str, Any]]:
|
||||
return [row["data"]["body"] for row in get_rows(response)]
|
||||
|
||||
|
||||
def _build_windows(promotion_ts: datetime) -> list[dict[str, Any]]:
|
||||
"""Three canonical time windows relative to a promotion timestamp."""
|
||||
return [
|
||||
{
|
||||
"name": "before_promoted",
|
||||
"startMs": int((promotion_ts - timedelta(minutes=25)).timestamp() * 1000),
|
||||
"endMs": int((promotion_ts - timedelta(minutes=1)).timestamp() * 1000),
|
||||
"check_fn": lambda q: _BODY_V2 in q and _BODY_PROMOTED not in q,
|
||||
},
|
||||
{
|
||||
"name": "after_promoted",
|
||||
"startMs": int((promotion_ts + timedelta(minutes=1)).timestamp() * 1000),
|
||||
"endMs": int((promotion_ts + timedelta(minutes=25)).timestamp() * 1000),
|
||||
"check_fn": lambda q: _BODY_PROMOTED in q and _BODY_V2 not in q,
|
||||
},
|
||||
{
|
||||
"name": "in_between",
|
||||
"startMs": int((promotion_ts - timedelta(minutes=25)).timestamp() * 1000),
|
||||
"endMs": int((promotion_ts + timedelta(minutes=25)).timestamp() * 1000),
|
||||
"check_fn": lambda q: _BODY_V2 in q and _BODY_PROMOTED in q,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Test 1 — raw (list) query shape + data correctness
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_body_promoted_list_query_shape(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[list[Logs]], None],
|
||||
export_json_types: Callable[[list[Logs]], None],
|
||||
export_promoted_paths: Callable[[list[str], datetime], None],
|
||||
check_query_log: Any,
|
||||
) -> None:
|
||||
"""
|
||||
Data landscape (4 logs, 2 pre-promotion, 2 post-promotion):
|
||||
|
||||
-15 min auth-service alice age=25 email IIT/Iron(sports)+Gold(academic)
|
||||
-5 min api-service bob age=30 MIT/Silver(research)
|
||||
+5 min compute-svc carol age=35 email Stanford/Bronze(sports)
|
||||
+12 min storage-svc dan age=40 Harvard/Diamond(research)
|
||||
|
||||
bob appears only before promotion → useful sentinel for "after_promoted: 0 rows".
|
||||
"sports" awards straddle the boundary (alice pre, carol post) → tests inBetween merge.
|
||||
|
||||
Cases:
|
||||
prim.string_eq — body.user.name = "bob"
|
||||
prim.int_gt — body.user.age > 32
|
||||
arr.element_eq — body.education[].name = "IIT"
|
||||
arr.awards_type — body.education[].awards[].type = "sports"
|
||||
"""
|
||||
now = datetime.now(tz=UTC)
|
||||
promotion_ts = now - timedelta(minutes=20)
|
||||
|
||||
# ── Pre-promotion logs (body_v2 only) ─────────────────────────────────────
|
||||
pre_alice = json.dumps({
|
||||
"user": {"name": "alice", "age": 25, "email": "alice@corp.io"},
|
||||
"status": 200,
|
||||
"education": [
|
||||
{
|
||||
"name": "IIT",
|
||||
"year": 2018,
|
||||
"parameters": [1.65, 2.5, 3.0],
|
||||
"awards": [
|
||||
{"name": "Iron Award", "type": "sports"},
|
||||
{"name": "Gold Award", "type": "academic"},
|
||||
],
|
||||
}
|
||||
],
|
||||
})
|
||||
pre_bob = json.dumps({
|
||||
"user": {"name": "bob", "age": 30},
|
||||
"status": 401,
|
||||
"education": [
|
||||
{
|
||||
"name": "MIT",
|
||||
"year": 2020,
|
||||
"parameters": [4.0, 5.5],
|
||||
"awards": [{"name": "Silver Award", "type": "research"}],
|
||||
}
|
||||
],
|
||||
})
|
||||
|
||||
# ── Post-promotion logs (body_v2 + body_promoted, promoted omits status) ──
|
||||
post_carol_full = {
|
||||
"user": {"name": "carol", "age": 35, "email": "carol@corp.io"},
|
||||
"status": 202,
|
||||
"education": [
|
||||
{
|
||||
"name": "Stanford",
|
||||
"year": 2021,
|
||||
"parameters": [2.75, 3.5],
|
||||
"awards": [{"name": "Bronze Award", "type": "sports"}],
|
||||
}
|
||||
],
|
||||
}
|
||||
post_dan_full = {
|
||||
"user": {"name": "dan", "age": 40},
|
||||
"status": 500,
|
||||
"education": [
|
||||
{
|
||||
"name": "Harvard",
|
||||
"year": 2023,
|
||||
"parameters": [6.0, 7.0, 8.0],
|
||||
"awards": [{"name": "Diamond Award", "type": "research"}],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
logs_list = [
|
||||
Logs(
|
||||
timestamp=promotion_ts - timedelta(minutes=15),
|
||||
resources={"service.name": "auth-service"},
|
||||
body_v2=pre_alice,
|
||||
body_promoted="",
|
||||
severity_text="INFO",
|
||||
),
|
||||
Logs(
|
||||
timestamp=promotion_ts - timedelta(minutes=5),
|
||||
resources={"service.name": "api-service"},
|
||||
body_v2=pre_bob,
|
||||
body_promoted="",
|
||||
severity_text="ERROR",
|
||||
),
|
||||
Logs(
|
||||
timestamp=promotion_ts + timedelta(minutes=5),
|
||||
resources={"service.name": "compute-service"},
|
||||
body_v2=json.dumps(post_carol_full),
|
||||
# body_promoted intentionally omits status — only the promoted paths
|
||||
body_promoted=json.dumps({
|
||||
"user": post_carol_full["user"],
|
||||
"education": post_carol_full["education"],
|
||||
}),
|
||||
severity_text="WARN",
|
||||
),
|
||||
Logs(
|
||||
timestamp=promotion_ts + timedelta(minutes=12),
|
||||
resources={"service.name": "storage-service"},
|
||||
body_v2=json.dumps(post_dan_full),
|
||||
body_promoted=json.dumps({
|
||||
"user": post_dan_full["user"],
|
||||
"education": post_dan_full["education"],
|
||||
}),
|
||||
severity_text="ERROR",
|
||||
),
|
||||
]
|
||||
|
||||
export_json_types(logs_list)
|
||||
export_promoted_paths(_PROMOTED_PATHS, promotion_ts)
|
||||
insert_logs(logs_list)
|
||||
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
|
||||
|
||||
cases = [
|
||||
# "a" in name: alice(pre), carol+dan(post) — 1 / 2 / 3 across windows
|
||||
{
|
||||
"name": "prim.name_contains",
|
||||
"expression": 'body.user.name CONTAINS "a"',
|
||||
"validates": {
|
||||
"before_promoted": lambda r: (
|
||||
len(get_rows(r)) == 1
|
||||
and _get_bodies(r)[0]["user"]["name"] == "alice"
|
||||
),
|
||||
"after_promoted": lambda r: (
|
||||
len(get_rows(r)) == 2
|
||||
and {b["user"]["name"] for b in _get_bodies(r)} == {"carol", "dan"}
|
||||
),
|
||||
"in_between": lambda r: (
|
||||
len(get_rows(r)) == 3
|
||||
and {b["user"]["name"] for b in _get_bodies(r)} == {"alice", "carol", "dan"}
|
||||
),
|
||||
},
|
||||
},
|
||||
# age < 36: alice(25)+bob(30) pre, carol(35) post, dan(40) excluded — 2 / 1 / 3
|
||||
{
|
||||
"name": "prim.age_lt",
|
||||
"expression": "body.user.age < 36",
|
||||
"validates": {
|
||||
"before_promoted": lambda r: (
|
||||
len(get_rows(r)) == 2
|
||||
and {b["user"]["name"] for b in _get_bodies(r)} == {"alice", "bob"}
|
||||
),
|
||||
"after_promoted": lambda r: (
|
||||
len(get_rows(r)) == 1
|
||||
and _get_bodies(r)[0]["user"]["name"] == "carol"
|
||||
),
|
||||
"in_between": lambda r: (
|
||||
len(get_rows(r)) == 3
|
||||
and {b["user"]["name"] for b in _get_bodies(r)} == {"alice", "bob", "carol"}
|
||||
),
|
||||
},
|
||||
},
|
||||
# education year > 2019: bob/MIT/2020(pre), carol/Stanford/2021+dan/Harvard/2023(post) — 1 / 2 / 3
|
||||
# alice/IIT/2018 is intentionally excluded in every window (2018 ≤ 2019)
|
||||
{
|
||||
"name": "arr.edu_year_gt",
|
||||
"expression": "body.education[].year > 2019",
|
||||
"validates": {
|
||||
"before_promoted": lambda r: (
|
||||
len(get_rows(r)) == 1
|
||||
and any(e["name"] == "MIT" for e in _get_bodies(r)[0]["education"])
|
||||
),
|
||||
"after_promoted": lambda r: (
|
||||
len(get_rows(r)) == 2
|
||||
and {b["user"]["name"] for b in _get_bodies(r)} == {"carol", "dan"}
|
||||
),
|
||||
"in_between": lambda r: (
|
||||
len(get_rows(r)) == 3
|
||||
and {b["user"]["name"] for b in _get_bodies(r)} == {"bob", "carol", "dan"}
|
||||
),
|
||||
},
|
||||
},
|
||||
# "sports" type straddles the boundary: alice pre (Iron), carol post (Bronze)
|
||||
{
|
||||
"name": "arr.awards_type",
|
||||
"expression": 'body.education[].awards[].type = "sports"',
|
||||
"validates": {
|
||||
"before_promoted": lambda r: (
|
||||
len(get_rows(r)) == 1
|
||||
and _get_bodies(r)[0]["user"]["name"] == "alice"
|
||||
),
|
||||
"after_promoted": lambda r: (
|
||||
len(get_rows(r)) == 1
|
||||
and _get_bodies(r)[0]["user"]["name"] == "carol"
|
||||
),
|
||||
"in_between": lambda r: (
|
||||
len(get_rows(r)) == 2
|
||||
and {b["user"]["name"] for b in _get_bodies(r)} == {"alice", "carol"}
|
||||
),
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
windows = _build_windows(promotion_ts)
|
||||
for case in cases:
|
||||
for window in windows:
|
||||
name = f"{case['name']}.{window['name']}"
|
||||
query = build_raw_query(
|
||||
name=name,
|
||||
signal="logs",
|
||||
filter_expression=case.get("expression"),
|
||||
order=[build_order_by("timestamp", "desc")],
|
||||
limit=case.get("limit", 100),
|
||||
step_interval=60,
|
||||
)
|
||||
before = datetime.now(tz=UTC)
|
||||
response = make_query_request(
|
||||
signoz=signoz,
|
||||
token=token,
|
||||
start_ms=window["startMs"],
|
||||
end_ms=window["endMs"],
|
||||
queries=[query],
|
||||
request_type="raw",
|
||||
)
|
||||
assert response.status_code == 200, f"HTTP {response.status_code} for '{name}': {response.text}"
|
||||
assert case["validates"][window["name"]](response), f"Validation failed for '{name}': {response.json()}"
|
||||
check_query_log(
|
||||
before,
|
||||
name,
|
||||
window["check_fn"],
|
||||
tables=["signoz_logs.distributed_logs_v2"],
|
||||
)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Test 2 — GroupBy scalar shape + data correctness
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_body_promoted_groupby_shape(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[list[Logs]], None],
|
||||
export_json_types: Callable[[list[Logs]], None],
|
||||
export_promoted_paths: Callable[[list[str], datetime], None],
|
||||
check_query_log: Any,
|
||||
) -> None:
|
||||
"""
|
||||
Data landscape (8 logs across both windows):
|
||||
|
||||
Pre-promotion (body_v2 only):
|
||||
-14 min alice age=25 (count×3 in before window)
|
||||
-10 min alice age=25
|
||||
-6 min bob age=30 (count×1)
|
||||
-2 min alice age=25
|
||||
|
||||
Post-promotion (body_v2 + body_promoted):
|
||||
+3 min carol age=35 (count×2 in after window)
|
||||
+8 min alice age=25 ← alice straddles both windows
|
||||
+13 min eve age=28 (count×1)
|
||||
+18 min carol age=35
|
||||
|
||||
alice straddles the promotion boundary intentionally to verify that the
|
||||
inBetween multiIf(body_promoted.user.name, body_v2.user.name) coalesces
|
||||
her entries from both columns into the same group-by bucket (count=4).
|
||||
|
||||
Cases (all requestType="scalar" for flat-table validation):
|
||||
groupby.name — GROUP BY body.user.name
|
||||
groupby.age — GROUP BY body.user.age
|
||||
groupby.multi — GROUP BY body.user.name + body.user.age
|
||||
"""
|
||||
now = datetime.now(tz=UTC)
|
||||
promotion_ts = now - timedelta(minutes=20)
|
||||
promoted_paths = ["user.name", "user.age"]
|
||||
|
||||
# (timestamp_offset, name, age, severity)
|
||||
pre_entries = [
|
||||
(timedelta(minutes=14), "alice", 25, "INFO"),
|
||||
(timedelta(minutes=10), "alice", 25, "WARN"),
|
||||
(timedelta(minutes=6), "bob", 30, "ERROR"),
|
||||
(timedelta(minutes=2), "alice", 25, "INFO"),
|
||||
]
|
||||
post_entries = [
|
||||
(timedelta(minutes=3), "carol", 35, "INFO"),
|
||||
(timedelta(minutes=8), "alice", 25, "DEBUG"),
|
||||
(timedelta(minutes=13), "eve", 28, "INFO"),
|
||||
(timedelta(minutes=18), "carol", 35, "WARN"),
|
||||
]
|
||||
|
||||
logs_list = [
|
||||
Logs(
|
||||
timestamp=promotion_ts - offset,
|
||||
resources={"service.name": "api-service"},
|
||||
body_v2=json.dumps({"user": {"name": name, "age": age}}),
|
||||
body_promoted="",
|
||||
severity_text=sev,
|
||||
)
|
||||
for offset, name, age, sev in pre_entries
|
||||
] + [
|
||||
Logs(
|
||||
timestamp=promotion_ts + offset,
|
||||
resources={"service.name": "api-service"},
|
||||
body_v2=json.dumps({"user": {"name": name, "age": age}}),
|
||||
body_promoted=json.dumps({"user": {"name": name, "age": age}}),
|
||||
severity_text=sev,
|
||||
)
|
||||
for offset, name, age, sev in post_entries
|
||||
]
|
||||
|
||||
export_json_types(logs_list)
|
||||
export_promoted_paths(promoted_paths, promotion_ts)
|
||||
insert_logs(logs_list)
|
||||
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
|
||||
|
||||
# ── Scalar result helpers ─────────────────────────────────────────────────
|
||||
|
||||
def _name_counts(r: Any) -> dict[str, int]:
|
||||
return {str(row[0]): row[-1] for row in get_scalar_table_data(r.json()) if row}
|
||||
|
||||
def _age_counts(r: Any) -> dict[str, int]:
|
||||
return {str(row[0]): row[-1] for row in get_scalar_table_data(r.json()) if row}
|
||||
|
||||
def _pair_counts(r: Any) -> dict[tuple[str, str], int]:
|
||||
return {
|
||||
(str(row[0]), str(row[1])): row[-1]
|
||||
for row in get_scalar_table_data(r.json())
|
||||
if len(row) >= 3
|
||||
}
|
||||
|
||||
# ── Cases ─────────────────────────────────────────────────────────────────
|
||||
|
||||
cases = [
|
||||
# before: alice×3, bob×1
|
||||
# after: carol×2, alice×1, eve×1
|
||||
# span: alice×4, bob×1, carol×2, eve×1
|
||||
{
|
||||
"name": "groupby.name",
|
||||
"groupBy": [{"name": "body.user.name", "fieldDataType": "string"}],
|
||||
"aggregation": "count()",
|
||||
"stepInterval": 60,
|
||||
"validates": {
|
||||
"before_promoted": lambda r: _name_counts(r) == {"alice": 3, "bob": 1},
|
||||
"after_promoted": lambda r: _name_counts(r) == {"carol": 2, "alice": 1, "eve": 1},
|
||||
"in_between": lambda r: _name_counts(r) == {"alice": 4, "bob": 1, "carol": 2, "eve": 1},
|
||||
},
|
||||
},
|
||||
# before: age-25×3, age-30×1
|
||||
# after: age-35×2, age-25×1, age-28×1
|
||||
# span: age-25×4, age-30×1, age-35×2, age-28×1
|
||||
{
|
||||
"name": "groupby.age",
|
||||
"groupBy": [{"name": "body.user.age", "fieldDataType": "int64"}],
|
||||
"aggregation": "count()",
|
||||
"stepInterval": 60,
|
||||
"validates": {
|
||||
"before_promoted": lambda r: _age_counts(r) == {"25": 3, "30": 1},
|
||||
"after_promoted": lambda r: _age_counts(r) == {"35": 2, "25": 1, "28": 1},
|
||||
"in_between": lambda r: _age_counts(r) == {"25": 4, "30": 1, "35": 2, "28": 1},
|
||||
},
|
||||
},
|
||||
# before: (alice,25)×3, (bob,30)×1
|
||||
# after: (carol,35)×2, (alice,25)×1, (eve,28)×1
|
||||
# span: (alice,25)×4, (bob,30)×1, (carol,35)×2, (eve,28)×1
|
||||
{
|
||||
"name": "groupby.multi",
|
||||
"groupBy": [
|
||||
{"name": "body.user.name", "fieldDataType": "string"},
|
||||
{"name": "body.user.age", "fieldDataType": "int64"},
|
||||
],
|
||||
"aggregation": "count()",
|
||||
"stepInterval": 60,
|
||||
"validates": {
|
||||
"before_promoted": lambda r: _pair_counts(r) == {
|
||||
("alice", "25"): 3, ("bob", "30"): 1,
|
||||
},
|
||||
"after_promoted": lambda r: _pair_counts(r) == {
|
||||
("carol", "35"): 2, ("alice", "25"): 1, ("eve", "28"): 1,
|
||||
},
|
||||
"in_between": lambda r: _pair_counts(r) == {
|
||||
("alice", "25"): 4, ("bob", "30"): 1,
|
||||
("carol", "35"): 2, ("eve", "28"): 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
windows = _build_windows(promotion_ts)
|
||||
for case in cases:
|
||||
for window in windows:
|
||||
name = f"{case['name']}.{window['name']}"
|
||||
query = build_scalar_query(
|
||||
name=name,
|
||||
signal="logs",
|
||||
aggregations=[build_logs_aggregation(case["aggregation"])],
|
||||
group_by=case["groupBy"],
|
||||
step_interval=case["stepInterval"],
|
||||
)
|
||||
before = datetime.now(tz=UTC)
|
||||
response = make_query_request(
|
||||
signoz=signoz,
|
||||
token=token,
|
||||
start_ms=window["startMs"],
|
||||
end_ms=window["endMs"],
|
||||
queries=[query],
|
||||
request_type="scalar",
|
||||
)
|
||||
assert response.status_code == 200, f"HTTP {response.status_code} for '{name}': {response.text}"
|
||||
assert case["validates"][window["name"]](response), f"Validation failed for '{name}': {response.json()}"
|
||||
check_query_log(
|
||||
before,
|
||||
name,
|
||||
window["check_fn"],
|
||||
tables=["signoz_logs.distributed_logs_v2"],
|
||||
)
|
||||
Reference in New Issue
Block a user