mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-03 08:33:26 +00:00
feat(JSON): JSON Body Metadata (#9593)
* feat: json Body Keys * feat: telemetry types * feat: change ExtractBodyPaths * chore: minor comment change * chore: func rename, file rename * chore: change table names * chore: reflect changes from the overhaul * test: fixing test 1 * fix: test TestQueryToKeys * fix: test TestPrepareLogsQuery * chore: remove db * chore: go mod * chore: changes based on review * chore: changes based on review * fix: in LIKE operation * chore: addressed few changes * revert: test file * fix: comparison fix * test: add TestBuildListLogsJSONIndexesQuery * fix: in test TestBuildListLogsJSONIndexesQuery * fix: pull promoted paths in single db call * fix: reducing db calls * test: fix TestBuildListLogsJSONIndexesQuery * fix: test TestConditionForJSONBodySearch * fix: lint try 1 * chore: review changes based on cursor * fix: use enums only --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
This commit is contained in:
9
go.mod
9
go.mod
@@ -8,7 +8,7 @@ require (
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.4
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/antonmedv/expr v1.15.3
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
@@ -86,12 +86,19 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic v1.14.1 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.15.1 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2 // indirect
|
||||
go.opentelemetry.io/collector/config/configretry v1.34.0 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
|
||||
modernc.org/libc v1.66.10 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
modernc.org/memory v1.11.0 // indirect
|
||||
|
||||
16
go.sum
16
go.sum
@@ -106,8 +106,8 @@ github.com/SigNoz/expr v1.17.7-beta h1:FyZkleM5dTQ0O6muQfwGpoH5A2ohmN/XTasRCO72g
|
||||
github.com/SigNoz/expr v1.17.7-beta/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.4 h1:DGDu9y1I1FU+HX4eECPGmfhnXE4ys4yr7LL6znbf6to=
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.4/go.mod h1:xyR+coBzzO04p6Eu+ql2RVYUl/jFD+8hD9lArcc9U7g=
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9 h1:WmYDSSwzyW2yiJ3tPq5AFdjsrz3NBdtPkygtFKOsACw=
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9/go.mod h1:4eJCRUd/P4OiCHXvGYZK8q6oyBVGJFVj/G6qKSoN/TQ=
|
||||
github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
|
||||
github.com/Yiling-J/theine-go v0.6.2/go.mod h1:08QpMa5JZ2pKN+UJCRrCasWYO1IKCdl54Xa836rpmDU=
|
||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
|
||||
@@ -162,6 +162,12 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=
|
||||
github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
|
||||
github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w=
|
||||
github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc=
|
||||
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
|
||||
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
|
||||
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
@@ -178,6 +184,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
|
||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
|
||||
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
|
||||
github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||
@@ -991,6 +999,8 @@ github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GH
|
||||
github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM=
|
||||
github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/uptrace/bun v1.2.9 h1:OOt2DlIcRUMSZPr6iXDFg/LaQd59kOxbAjpIVHddKRs=
|
||||
github.com/uptrace/bun v1.2.9/go.mod h1:r2ZaaGs9Ru5bpGTr8GQfp8jp+TlCav9grYCPOu2CJSg=
|
||||
github.com/uptrace/bun/dialect/pgdialect v1.2.9 h1:caf5uFbOGiXvadV6pA5gn87k0awFFxL1kuuY3SpxnWk=
|
||||
@@ -1235,6 +1245,8 @@ go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
|
||||
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
||||
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU=
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
|
||||
@@ -208,3 +208,13 @@ func WrapUnexpectedf(cause error, code Code, format string, args ...any) *base {
|
||||
func NewUnexpectedf(code Code, format string, args ...any) *base {
|
||||
return Newf(TypeInvalidInput, code, format, args...)
|
||||
}
|
||||
|
||||
// WrapTimeoutf is a wrapper around Wrapf with TypeTimeout.
|
||||
func WrapTimeoutf(cause error, code Code, format string, args ...any) *base {
|
||||
return Wrapf(cause, TypeTimeout, code, format, args...)
|
||||
}
|
||||
|
||||
// NewTimeoutf is a wrapper around Newf with TypeTimeout.
|
||||
func NewTimeoutf(code Code, format string, args ...any) *base {
|
||||
return Newf(TypeTimeout, code, format, args...)
|
||||
}
|
||||
|
||||
@@ -198,7 +198,6 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
|
||||
FieldMapper: v.fieldMapper,
|
||||
ConditionBuilder: v.conditionBuilder,
|
||||
FullTextColumn: v.fullTextColumn,
|
||||
JsonBodyPrefix: v.jsonBodyPrefix,
|
||||
JsonKeyToKey: v.jsonKeyToKey,
|
||||
}, 0, 0,
|
||||
)
|
||||
|
||||
17
pkg/querybuilder/constants.go
Normal file
17
pkg/querybuilder/constants.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package querybuilder
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
var (
|
||||
BodyJSONQueryEnabled = GetOrDefaultEnv("BODY_JSON_QUERY_ENABLED", "false") == "true"
|
||||
)
|
||||
|
||||
func GetOrDefaultEnv(key string, fallback string) string {
|
||||
v := os.Getenv(key)
|
||||
if len(v) == 0 {
|
||||
return fallback
|
||||
}
|
||||
return v
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
)
|
||||
|
||||
func TestQueryToKeys(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
query string
|
||||
expectedKeys []telemetrytypes.FieldKeySelector
|
||||
@@ -66,9 +65,9 @@ func TestQueryToKeys(t *testing.T) {
|
||||
query: `body.user_ids[*] = 123`,
|
||||
expectedKeys: []telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "body.user_ids[*]",
|
||||
Name: "user_ids[*]",
|
||||
Signal: telemetrytypes.SignalUnspecified,
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -162,7 +162,6 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
|
||||
ConditionBuilder: b.conditionBuilder,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: b.fullTextColumn,
|
||||
JsonBodyPrefix: b.jsonBodyPrefix,
|
||||
JsonKeyToKey: b.jsonKeyToKey,
|
||||
SkipFullTextFilter: true,
|
||||
SkipFunctionCalls: true,
|
||||
|
||||
@@ -33,7 +33,6 @@ type filterExpressionVisitor struct {
|
||||
mainErrorURL string
|
||||
builder *sqlbuilder.SelectBuilder
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey
|
||||
jsonBodyPrefix string
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
||||
skipResourceFilter bool
|
||||
skipFullTextFilter bool
|
||||
@@ -53,7 +52,6 @@ type FilterExprVisitorOpts struct {
|
||||
FieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
Builder *sqlbuilder.SelectBuilder
|
||||
FullTextColumn *telemetrytypes.TelemetryFieldKey
|
||||
JsonBodyPrefix string
|
||||
JsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
||||
SkipResourceFilter bool
|
||||
SkipFullTextFilter bool
|
||||
@@ -73,7 +71,6 @@ func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVis
|
||||
fieldKeys: opts.FieldKeys,
|
||||
builder: opts.Builder,
|
||||
fullTextColumn: opts.FullTextColumn,
|
||||
jsonBodyPrefix: opts.JsonBodyPrefix,
|
||||
jsonKeyToKey: opts.JsonKeyToKey,
|
||||
skipResourceFilter: opts.SkipResourceFilter,
|
||||
skipFullTextFilter: opts.SkipFullTextFilter,
|
||||
@@ -173,7 +170,7 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts, startNs uint64
|
||||
|
||||
whereClause := sqlbuilder.NewWhereClause().AddWhereExpr(visitor.builder.Args, cond)
|
||||
|
||||
return &PreparedWhereClause{whereClause, visitor.warnings, visitor.mainWarnURL}, nil
|
||||
return &PreparedWhereClause{WhereClause: whereClause, Warnings: visitor.warnings, WarningsDocURL: visitor.mainWarnURL}, nil
|
||||
}
|
||||
|
||||
// Visit dispatches to the specific visit method based on node type
|
||||
@@ -718,7 +715,7 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
|
||||
conds = append(conds, fmt.Sprintf("hasToken(LOWER(%s), LOWER(%s))", key.Name, v.builder.Var(value[0])))
|
||||
} else {
|
||||
// this is that all other functions only support array fields
|
||||
if strings.HasPrefix(key.Name, v.jsonBodyPrefix) {
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value)
|
||||
} else {
|
||||
// TODO(add docs for json body search)
|
||||
@@ -809,10 +806,8 @@ func (v *filterExpressionVisitor) VisitValue(ctx *grammar.ValueContext) any {
|
||||
|
||||
// VisitKey handles field/column references
|
||||
func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
|
||||
|
||||
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(ctx.GetText())
|
||||
|
||||
keyName := strings.TrimPrefix(fieldKey.Name, v.jsonBodyPrefix)
|
||||
keyName := fieldKey.Name
|
||||
|
||||
fieldKeysForName := v.fieldKeys[keyName]
|
||||
|
||||
@@ -846,10 +841,11 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
|
||||
// if there is a field with the same name as attribute/resource attribute
|
||||
// Since it will ORed with the fieldKeysForName, it will not result empty
|
||||
// when either of them have values
|
||||
if strings.HasPrefix(fieldKey.Name, v.jsonBodyPrefix) && v.jsonBodyPrefix != "" {
|
||||
if keyName != "" {
|
||||
fieldKeysForName = append(fieldKeysForName, &fieldKey)
|
||||
}
|
||||
// Note: Skip this logic if body json query is enabled so we can look up the key inside fields
|
||||
//
|
||||
// TODO(Piyush): After entire migration this is supposed to be removed.
|
||||
if !BodyJSONQueryEnabled && fieldKey.FieldContext == telemetrytypes.FieldContextBody {
|
||||
fieldKeysForName = append(fieldKeysForName, &fieldKey)
|
||||
}
|
||||
|
||||
if len(fieldKeysForName) == 0 {
|
||||
@@ -860,7 +856,7 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
|
||||
return v.fieldKeys[keyWithContext]
|
||||
}
|
||||
|
||||
if strings.HasPrefix(fieldKey.Name, v.jsonBodyPrefix) && v.jsonBodyPrefix != "" && keyName == "" {
|
||||
if fieldKey.FieldContext == telemetrytypes.FieldContextBody && keyName == "" {
|
||||
v.errors = append(v.errors, "missing key for body json search - expected key of the form `body.key` (ex: `body.status`)")
|
||||
} else if !v.ignoreNotFoundKeys {
|
||||
// TODO(srikanthccv): do we want to return an error here?
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
@@ -52,7 +51,8 @@ func (c *conditionBuilder) conditionFor(
|
||||
return "", err
|
||||
}
|
||||
|
||||
if strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
|
||||
// Check if this is a body JSON search - either by FieldContext
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
tblFieldName, value = GetBodyJSONKey(ctx, key, operator, value)
|
||||
}
|
||||
|
||||
@@ -164,7 +164,8 @@ func (c *conditionBuilder) conditionFor(
|
||||
// key membership checks, so depending on the column type, the condition changes
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
|
||||
if strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
|
||||
// Check if this is a body JSON search - by FieldContext
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return GetBodyJSONKeyForExists(ctx, key, operator, value), nil
|
||||
} else {
|
||||
@@ -173,45 +174,57 @@ func (c *conditionBuilder) conditionFor(
|
||||
}
|
||||
|
||||
var value any
|
||||
switch column.Type {
|
||||
case schema.JSONColumnType{}:
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(tblFieldName), nil
|
||||
} else {
|
||||
return sb.IsNull(tblFieldName), nil
|
||||
}
|
||||
case schema.ColumnTypeString, schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}:
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
}
|
||||
return sb.E(tblFieldName, value), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
} else {
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.ColumnTypeUInt64, schema.ColumnTypeUInt32, schema.ColumnTypeUInt8:
|
||||
case schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
value = 0
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
} else {
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}, schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}, schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
} else {
|
||||
return sb.NE(leftOperand, true), nil
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
} else {
|
||||
return sb.NE(leftOperand, true), nil
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
@@ -238,7 +251,7 @@ func (c *conditionBuilder) ConditionFor(
|
||||
// skip adding exists filter for intrinsic fields
|
||||
// with an exception for body json search
|
||||
field, _ := c.fm.FieldFor(ctx, key)
|
||||
if slices.Contains(maps.Keys(IntrinsicFields), field) && !strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
|
||||
if slices.Contains(maps.Keys(IntrinsicFields), field) && key.FieldContext != telemetrytypes.FieldContextBody {
|
||||
return condition, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -465,7 +465,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Equal operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorEqual,
|
||||
value: 200,
|
||||
@@ -475,7 +476,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Equal operator - float64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.duration_ms",
|
||||
Name: "duration_ms",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorEqual,
|
||||
value: 405.5,
|
||||
@@ -485,7 +487,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Equal operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.method",
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorEqual,
|
||||
value: "GET",
|
||||
@@ -495,7 +498,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Equal operator - bool",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.success",
|
||||
Name: "http.success",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorEqual,
|
||||
value: true,
|
||||
@@ -505,7 +509,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Exists operator",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
value: nil,
|
||||
@@ -515,7 +520,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Not Exists operator",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotExists,
|
||||
value: nil,
|
||||
@@ -525,7 +531,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Greater than operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorGreaterThan,
|
||||
value: "200",
|
||||
@@ -535,7 +542,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Greater than operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorGreaterThan,
|
||||
value: 200,
|
||||
@@ -545,7 +553,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Less than operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorLessThan,
|
||||
value: "300",
|
||||
@@ -555,7 +564,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Less than operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorLessThan,
|
||||
value: 300,
|
||||
@@ -565,7 +575,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Contains operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorContains,
|
||||
value: "200",
|
||||
@@ -575,7 +586,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Not Contains operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotContains,
|
||||
value: "200",
|
||||
@@ -585,7 +597,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Between operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorBetween,
|
||||
value: []any{"200", "300"},
|
||||
@@ -595,7 +608,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Between operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorBetween,
|
||||
value: []any{400, 500},
|
||||
@@ -605,7 +619,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "In operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorIn,
|
||||
value: []any{"200", "300"},
|
||||
@@ -615,7 +630,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "In operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorIn,
|
||||
value: []any{401, 404, 500},
|
||||
@@ -625,7 +641,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Regexp operator - json body string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.method",
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorRegexp,
|
||||
value: "GET|POST|PUT",
|
||||
@@ -635,7 +652,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Not Regexp operator - json body string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.method",
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotRegexp,
|
||||
value: "DELETE|PATCH",
|
||||
@@ -645,7 +663,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Regexp operator - json body with dots in path",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.user.email",
|
||||
Name: "user.email",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorRegexp,
|
||||
value: "^.*@example\\.com$",
|
||||
@@ -655,7 +674,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Not Regexp operator - json body nested path",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.response.headers.content-type",
|
||||
Name: "response.headers.content-type",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotRegexp,
|
||||
value: "^text/.*",
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package telemetrylogs
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
@@ -16,6 +18,8 @@ const (
|
||||
LogsV2TimestampColumn = "timestamp"
|
||||
LogsV2ObservedTimestampColumn = "observed_timestamp"
|
||||
LogsV2BodyColumn = "body"
|
||||
LogsV2BodyJSONColumn = constants.BodyJSONColumn
|
||||
LogsV2BodyPromotedColumn = constants.BodyPromotedColumn
|
||||
LogsV2TraceIDColumn = "trace_id"
|
||||
LogsV2SpanIDColumn = "span_id"
|
||||
LogsV2TraceFlagsColumn = "trace_flags"
|
||||
@@ -30,6 +34,11 @@ const (
|
||||
LogsV2AttributesBoolColumn = "attributes_bool"
|
||||
LogsV2ResourcesStringColumn = "resources_string"
|
||||
LogsV2ScopeStringColumn = "scope_string"
|
||||
|
||||
BodyJSONColumnPrefix = constants.BodyJSONColumnPrefix
|
||||
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
|
||||
ArraySep = jsontypeexporter.ArraySeparator
|
||||
ArrayAnyIndex = "[*]."
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -82,10 +82,13 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
|
||||
case telemetrytypes.FieldDataTypeBool:
|
||||
return logsV2Columns["attributes_bool"], nil
|
||||
}
|
||||
case telemetrytypes.FieldContextBody:
|
||||
// body context fields are stored in the body column
|
||||
return logsV2Columns["body"], nil
|
||||
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
|
||||
col, ok := logsV2Columns[key.Name]
|
||||
if !ok {
|
||||
// check if the key has body JSON search
|
||||
// check if the key has body JSON search (backward compatibility)
|
||||
if strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
|
||||
return logsV2Columns["body"], nil
|
||||
}
|
||||
@@ -103,8 +106,8 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
|
||||
return "", err
|
||||
}
|
||||
|
||||
switch column.Type {
|
||||
case schema.JSONColumnType{}:
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
@@ -121,40 +124,32 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
|
||||
} else {
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
||||
}
|
||||
|
||||
case schema.ColumnTypeString,
|
||||
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
schema.ColumnTypeUInt64,
|
||||
schema.ColumnTypeUInt32,
|
||||
schema.ColumnTypeUInt8:
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
return column.Name, nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
return column.Name, nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
}
|
||||
// should not reach here
|
||||
return column.Name, nil
|
||||
|
||||
@@ -21,7 +21,6 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
@@ -58,7 +57,6 @@ func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
|
||||
@@ -27,8 +27,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body",
|
||||
},
|
||||
JsonBodyPrefix: "body",
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
@@ -163,7 +162,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.category, limitString(tc.query, 50)), func(t *testing.T) {
|
||||
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
|
||||
if tc.shouldPass {
|
||||
if err != nil {
|
||||
|
||||
@@ -27,7 +27,6 @@ func TestFilterExprLogs(t *testing.T) {
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
@@ -2448,7 +2447,6 @@ func TestFilterExprLogsConflictNegation(t *testing.T) {
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ func inferDataType(value any, operator qbtypes.FilterOperator, key *telemetrytyp
|
||||
}
|
||||
|
||||
func getBodyJSONPath(key *telemetrytypes.TelemetryFieldKey) string {
|
||||
parts := strings.Split(key.Name, ".")[1:]
|
||||
parts := strings.Split(key.Name, ".")
|
||||
newParts := []string{}
|
||||
for _, part := range parts {
|
||||
if strings.HasSuffix(part, "[*]") {
|
||||
|
||||
@@ -589,10 +589,9 @@ func (b *logQueryStatementBuilder) addFilterCondition(
|
||||
FieldKeys: keys,
|
||||
SkipResourceFilter: true,
|
||||
FullTextColumn: b.fullTextColumn,
|
||||
JsonBodyPrefix: b.jsonBodyPrefix,
|
||||
JsonKeyToKey: b.jsonKeyToKey,
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
}, start, end)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -8,4 +8,7 @@ const (
|
||||
TagAttributesV2LocalTableName = "tag_attributes_v2"
|
||||
LogAttributeKeysTblName = "distributed_logs_attribute_keys"
|
||||
LogResourceKeysTblName = "distributed_logs_resource_keys"
|
||||
PathTypesTableName = "distributed_json_path_types"
|
||||
PromotedPathsTableName = "distributed_json_promoted_paths"
|
||||
SkipIndexTableName = "system.data_skipping_indices"
|
||||
)
|
||||
|
||||
496
pkg/telemetrymetadata/body_json_metadata.go
Normal file
496
pkg/telemetrymetadata/body_json_metadata.go
Normal file
@@ -0,0 +1,496 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz-otel-collector/utils"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultPathLimit = 100 // Default limit to prevent full table scans
|
||||
|
||||
CodeUnknownJSONDataType = errors.MustNewCode("unknown_json_data_type")
|
||||
CodeFailLoadPromotedPaths = errors.MustNewCode("fail_load_promoted_paths")
|
||||
CodeFailCheckPathPromoted = errors.MustNewCode("fail_check_path_promoted")
|
||||
CodeFailIterateBodyJSONKeys = errors.MustNewCode("fail_iterate_body_json_keys")
|
||||
CodeFailExtractBodyJSONKeys = errors.MustNewCode("fail_extract_body_json_keys")
|
||||
CodeFailLoadLogsJSONIndexes = errors.MustNewCode("fail_load_logs_json_indexes")
|
||||
CodeFailListJSONValues = errors.MustNewCode("fail_list_json_values")
|
||||
CodeFailScanJSONValue = errors.MustNewCode("fail_scan_json_value")
|
||||
CodeFailScanVariant = errors.MustNewCode("fail_scan_variant")
|
||||
CodeFailBuildJSONPathsQuery = errors.MustNewCode("fail_build_json_paths_query")
|
||||
CodeNoPathsToQueryIndexes = errors.MustNewCode("no_paths_to_query_indexes_provided")
|
||||
)
|
||||
|
||||
// GetBodyJSONPaths extracts body JSON paths from the path_types table
|
||||
// This function can be used by both JSONQueryBuilder and metadata extraction
|
||||
// uniquePathLimit: 0 for no limit, >0 for maximum number of unique paths to return
|
||||
// - For startup load: set to 10000 to get top 10k unique paths
|
||||
// - For lookup: set to 0 (no limit needed for single path)
|
||||
// - For metadata API: set to desired pagination limit
|
||||
//
|
||||
// searchOperator: LIKE for pattern matching, EQUAL for exact match
|
||||
// Returns: (paths, error)
|
||||
// TODO(Piyush): Remove this lint skip
|
||||
//
|
||||
// nolint:unused
|
||||
func getBodyJSONPaths(ctx context.Context, telemetryStore telemetrystore.TelemetryStore,
|
||||
fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
|
||||
|
||||
query, args, limit, err := buildGetBodyJSONPathsQuery(fieldKeySelectors)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
rows, err := telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to extract body JSON keys")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
fieldKeys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
paths := []string{}
|
||||
rowCount := 0
|
||||
for rows.Next() {
|
||||
var path string
|
||||
var typesArray []string // ClickHouse returns array as []string
|
||||
var lastSeen uint64
|
||||
|
||||
err = rows.Scan(&path, &typesArray, &lastSeen)
|
||||
if err != nil {
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to scan body JSON key row")
|
||||
}
|
||||
|
||||
for _, typ := range typesArray {
|
||||
mapping, found := telemetrytypes.MappingStringToJSONDataType[typ]
|
||||
if !found {
|
||||
return nil, false, errors.NewInternalf(CodeUnknownJSONDataType, "failed to map type string to JSON data type: %s", typ)
|
||||
}
|
||||
fieldKeys = append(fieldKeys, &telemetrytypes.TelemetryFieldKey{
|
||||
Name: path,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldDataType: telemetrytypes.MappingJSONDataTypeToFieldDataType[mapping],
|
||||
JSONDataType: &mapping,
|
||||
})
|
||||
}
|
||||
|
||||
paths = append(paths, path)
|
||||
rowCount++
|
||||
}
|
||||
if rows.Err() != nil {
|
||||
return nil, false, errors.WrapInternalf(rows.Err(), CodeFailIterateBodyJSONKeys, "error iterating body JSON keys")
|
||||
}
|
||||
|
||||
promoted, err := GetPromotedPaths(ctx, telemetryStore.ClickhouseDB(), paths...)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
indexes, err := getJSONPathIndexes(ctx, telemetryStore, paths...)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
for _, fieldKey := range fieldKeys {
|
||||
fieldKey.Materialized = promoted.Contains(fieldKey.Name)
|
||||
fieldKey.Indexes = indexes[fieldKey.Name]
|
||||
}
|
||||
|
||||
return fieldKeys, rowCount <= limit, nil
|
||||
}
|
||||
|
||||
func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySelector) (string, []any, int, error) {
|
||||
if len(fieldKeySelectors) == 0 {
|
||||
return "", nil, defaultPathLimit, errors.NewInternalf(CodeFailBuildJSONPathsQuery, "no field key selectors provided")
|
||||
}
|
||||
from := fmt.Sprintf("%s.%s", DBName, PathTypesTableName)
|
||||
|
||||
// Build a better query using GROUP BY to deduplicate at database level
|
||||
// This aggregates all types per path and gets the max last_seen, then applies LIMIT
|
||||
sb := sqlbuilder.Select(
|
||||
"path",
|
||||
"groupArray(DISTINCT type) AS types",
|
||||
"max(last_seen) AS last_seen",
|
||||
).From(from)
|
||||
|
||||
limit := 0
|
||||
// Add search filter if provided
|
||||
orClauses := []string{}
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
// replace [*] with []
|
||||
fieldKeySelector.Name = strings.ReplaceAll(fieldKeySelector.Name, telemetrylogs.ArrayAnyIndex, telemetrylogs.ArraySep)
|
||||
// Extract search text for body JSON keys
|
||||
keyName := CleanPathPrefixes(fieldKeySelector.Name)
|
||||
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
orClauses = append(orClauses, sb.Equal("path", keyName))
|
||||
} else {
|
||||
// Pattern matching for metadata API (defaults to LIKE behavior for other operators)
|
||||
orClauses = append(orClauses, sb.Like("path", querybuilder.FormatValueForContains(keyName)))
|
||||
}
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
sb.Where(sb.Or(orClauses...))
|
||||
|
||||
// Group by path to get unique paths with aggregated types
|
||||
sb.GroupBy("path")
|
||||
|
||||
// Order by max last_seen to get most recent paths first
|
||||
sb.OrderBy("last_seen DESC")
|
||||
if limit == 0 {
|
||||
limit = defaultPathLimit
|
||||
}
|
||||
sb.Limit(limit)
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return query, args, limit, nil
|
||||
}
|
||||
|
||||
// TODO(Piyush): Remove this lint skip
|
||||
//
|
||||
// nolint:unused
|
||||
func getJSONPathIndexes(ctx context.Context, telemetryStore telemetrystore.TelemetryStore, paths ...string) (map[string][]telemetrytypes.JSONDataTypeIndex, error) {
|
||||
filteredPaths := []string{}
|
||||
for _, path := range paths {
|
||||
if strings.Contains(path, telemetrylogs.ArraySep) || strings.Contains(path, telemetrylogs.ArrayAnyIndex) {
|
||||
continue
|
||||
}
|
||||
filteredPaths = append(filteredPaths, path)
|
||||
}
|
||||
if len(filteredPaths) == 0 {
|
||||
return nil, errors.NewInternalf(CodeNoPathsToQueryIndexes, "no paths to query indexes provided")
|
||||
}
|
||||
|
||||
// list indexes for the paths
|
||||
indexesMap, err := ListLogsJSONIndexes(ctx, telemetryStore, filteredPaths...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to list JSON path indexes")
|
||||
}
|
||||
|
||||
// build a set of indexes
|
||||
cleanIndexes := make(map[string][]telemetrytypes.JSONDataTypeIndex)
|
||||
for path, indexes := range indexesMap {
|
||||
for _, index := range indexes {
|
||||
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", index.Expression)
|
||||
}
|
||||
|
||||
jsonDataType, found := telemetrytypes.MappingStringToJSONDataType[columnType]
|
||||
if !found {
|
||||
return nil, errors.NewInternalf(CodeUnknownJSONDataType, "failed to map column type to JSON data type: %s", columnType)
|
||||
}
|
||||
|
||||
if jsonDataType == telemetrytypes.String {
|
||||
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
|
||||
Type: telemetrytypes.String,
|
||||
ColumnExpression: columnExpr,
|
||||
IndexExpression: index.Expression,
|
||||
})
|
||||
} else if strings.HasPrefix(index.Type, "minmax") {
|
||||
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
|
||||
Type: jsonDataType,
|
||||
ColumnExpression: columnExpr,
|
||||
IndexExpression: index.Expression,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return cleanIndexes, nil
|
||||
}
|
||||
|
||||
func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, []any) {
|
||||
// This aggregates all types per path and gets the max last_seen, then applies LIMIT
|
||||
sb := sqlbuilder.Select(
|
||||
"name", "type_full", "expr", "granularity",
|
||||
).From(fmt.Sprintf("clusterAllReplicas('%s', %s)", cluster, SkipIndexTableName))
|
||||
|
||||
sb.Where(sb.Equal("database", telemetrylogs.DBName))
|
||||
sb.Where(sb.Equal("table", telemetrylogs.LogsV2LocalTableName))
|
||||
sb.Where(sb.Or(
|
||||
sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyJSONColumnPrefix))),
|
||||
sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyPromotedColumnPrefix))),
|
||||
))
|
||||
|
||||
filterExprs := []string{}
|
||||
for _, filter := range filters {
|
||||
filterExprs = append(filterExprs, sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(filter))))
|
||||
}
|
||||
sb.Where(sb.Or(filterExprs...))
|
||||
|
||||
return sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
}
|
||||
|
||||
func ListLogsJSONIndexes(ctx context.Context, telemetryStore telemetrystore.TelemetryStore, filters ...string) (map[string][]schemamigrator.Index, error) {
|
||||
query, args := buildListLogsJSONIndexesQuery(telemetryStore.Cluster(), filters...)
|
||||
rows, err := telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to load string indexed columns")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
indexesMap := make(map[string][]schemamigrator.Index)
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var typeFull string
|
||||
var expr string
|
||||
var granularity uint64
|
||||
if err := rows.Scan(&name, &typeFull, &expr, &granularity); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to scan string indexed column")
|
||||
}
|
||||
indexesMap[name] = append(indexesMap[name], schemamigrator.Index{
|
||||
Name: name,
|
||||
Type: typeFull,
|
||||
Expression: expr,
|
||||
Granularity: int(granularity),
|
||||
})
|
||||
}
|
||||
|
||||
return indexesMap, nil
|
||||
}
|
||||
|
||||
func ListPromotedPaths(ctx context.Context, conn clickhouse.Conn) (map[string]struct{}, error) {
|
||||
query := fmt.Sprintf("SELECT path FROM %s.%s", DBName, PromotedPathsTableName)
|
||||
rows, err := conn.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadPromotedPaths, "failed to load promoted paths")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
next := make(map[string]struct{})
|
||||
for rows.Next() {
|
||||
var path string
|
||||
if err := rows.Scan(&path); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadPromotedPaths, "failed to scan promoted path")
|
||||
}
|
||||
next[path] = struct{}{}
|
||||
}
|
||||
|
||||
return next, nil
|
||||
}
|
||||
|
||||
// TODO(Piyush): Remove this if not used in future
|
||||
func ListJSONValues(ctx context.Context, conn clickhouse.Conn, path string, limit int) (*telemetrytypes.TelemetryFieldValues, bool, error) {
|
||||
path = CleanPathPrefixes(path)
|
||||
|
||||
if strings.Contains(path, telemetrylogs.ArraySep) || strings.Contains(path, telemetrylogs.ArrayAnyIndex) {
|
||||
return nil, false, errors.NewInvalidInputf(errors.CodeInvalidInput, "array paths are not supported")
|
||||
}
|
||||
|
||||
promoted, err := IsPathPromoted(ctx, conn, path)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if promoted {
|
||||
path = telemetrylogs.BodyPromotedColumnPrefix + path
|
||||
} else {
|
||||
path = telemetrylogs.BodyJSONColumnPrefix + path
|
||||
}
|
||||
|
||||
from := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
|
||||
colExpr := func(typ telemetrytypes.JSONDataType) string {
|
||||
return fmt.Sprintf("dynamicElement(%s, '%s')", path, typ.StringValue())
|
||||
}
|
||||
|
||||
sb := sqlbuilder.Select(
|
||||
colExpr(telemetrytypes.String),
|
||||
colExpr(telemetrytypes.Int64),
|
||||
colExpr(telemetrytypes.Float64),
|
||||
colExpr(telemetrytypes.Bool),
|
||||
colExpr(telemetrytypes.ArrayString),
|
||||
colExpr(telemetrytypes.ArrayInt64),
|
||||
colExpr(telemetrytypes.ArrayFloat64),
|
||||
colExpr(telemetrytypes.ArrayBool),
|
||||
colExpr(telemetrytypes.ArrayDynamic),
|
||||
).From(from)
|
||||
sb.Where(fmt.Sprintf("%s IS NOT NULL", path))
|
||||
sb.Limit(limit)
|
||||
|
||||
contextWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
rows, err := conn.Query(contextWithTimeout, query, args...)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return nil, false, errors.WrapTimeoutf(err, errors.CodeTimeout, "query timed out").WithAdditional("failed to list JSON values")
|
||||
}
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailListJSONValues, "failed to list JSON values")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Get column types to determine proper scan types
|
||||
colTypes := rows.ColumnTypes()
|
||||
scanTargets := make([]any, len(colTypes))
|
||||
for i := range colTypes {
|
||||
scanTargets[i] = reflect.New(colTypes[i].ScanType()).Interface()
|
||||
}
|
||||
|
||||
values := &telemetrytypes.TelemetryFieldValues{}
|
||||
for rows.Next() {
|
||||
// Create fresh scan targets for each row
|
||||
scan := make([]any, len(colTypes))
|
||||
for i := range colTypes {
|
||||
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
|
||||
}
|
||||
|
||||
if err := rows.Scan(scan...); err != nil {
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailListJSONValues, "failed to scan JSON value row")
|
||||
}
|
||||
|
||||
// Extract values from scan targets and process them
|
||||
// Column order: String, Int64, Float64, Bool, ArrayString, ArrayInt64, ArrayFloat64, ArrayBool, ArrayDynamic
|
||||
var consume func(scan []any) error
|
||||
consume = func(scan []any) error {
|
||||
for _, value := range scan {
|
||||
value := derefValue(value) // dereference the double pointer if it is a pointer
|
||||
switch value := value.(type) {
|
||||
case string:
|
||||
values.StringValues = append(values.StringValues, value)
|
||||
case int64:
|
||||
values.NumberValues = append(values.NumberValues, float64(value))
|
||||
case float64:
|
||||
values.NumberValues = append(values.NumberValues, value)
|
||||
case bool:
|
||||
values.BoolValues = append(values.BoolValues, value)
|
||||
case []*string:
|
||||
for _, str := range value {
|
||||
if str != nil {
|
||||
values.StringValues = append(values.StringValues, *str)
|
||||
}
|
||||
}
|
||||
case []*int64:
|
||||
for _, num := range value {
|
||||
if num != nil {
|
||||
values.NumberValues = append(values.NumberValues, float64(*num))
|
||||
}
|
||||
}
|
||||
case []*float64:
|
||||
for _, num := range value {
|
||||
if num != nil {
|
||||
values.NumberValues = append(values.NumberValues, float64(*num))
|
||||
}
|
||||
}
|
||||
case []*bool:
|
||||
for _, boolVal := range value {
|
||||
if boolVal != nil {
|
||||
values.BoolValues = append(values.BoolValues, *boolVal)
|
||||
}
|
||||
}
|
||||
case chcol.Variant:
|
||||
if !value.Nil() {
|
||||
if err := consume([]any{value.Any()}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case []chcol.Variant:
|
||||
extractedValues := make([]any, len(value))
|
||||
for idx, variant := range value {
|
||||
if !variant.Nil() && variant.Type() != "JSON" { // skip JSON values cuz they're relevant for nested keys
|
||||
extractedValues[idx] = variant.Any()
|
||||
}
|
||||
}
|
||||
if err := consume(extractedValues); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
return errors.NewInternalf(CodeFailScanJSONValue, "unknown JSON value type: %T", value)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
if err := consume(scan); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailListJSONValues, "error iterating JSON values")
|
||||
}
|
||||
|
||||
return values, true, nil
|
||||
}
|
||||
|
||||
func derefValue(v any) any {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
val := reflect.ValueOf(v)
|
||||
for val.Kind() == reflect.Ptr {
|
||||
if val.IsNil() {
|
||||
return nil
|
||||
}
|
||||
val = val.Elem()
|
||||
}
|
||||
|
||||
return val.Interface()
|
||||
}
|
||||
|
||||
// IsPathPromoted checks if a specific path is promoted
|
||||
func IsPathPromoted(ctx context.Context, conn clickhouse.Conn, path string) (bool, error) {
|
||||
split := strings.Split(path, telemetrylogs.ArraySep)
|
||||
query := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE path = ? LIMIT 1", DBName, PromotedPathsTableName)
|
||||
rows, err := conn.Query(ctx, query, split[0])
|
||||
if err != nil {
|
||||
return false, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to check if path %s is promoted", path)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
return rows.Next(), nil
|
||||
}
|
||||
|
||||
// GetPromotedPaths checks if a specific path is promoted
|
||||
func GetPromotedPaths(ctx context.Context, conn clickhouse.Conn, paths ...string) (*utils.ConcurrentSet[string], error) {
|
||||
sb := sqlbuilder.Select("path").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
||||
pathConditions := []string{}
|
||||
for _, path := range paths {
|
||||
pathConditions = append(pathConditions, sb.Equal("path", path))
|
||||
}
|
||||
sb.Where(sb.Or(pathConditions...))
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
rows, err := conn.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to get promoted paths")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
promotedPaths := utils.NewConcurrentSet[string]()
|
||||
for rows.Next() {
|
||||
var path string
|
||||
if err := rows.Scan(&path); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to scan promoted path")
|
||||
}
|
||||
promotedPaths.Insert(path)
|
||||
}
|
||||
|
||||
return promotedPaths, nil
|
||||
}
|
||||
|
||||
// TODO(Piyush): Remove this function
|
||||
func CleanPathPrefixes(path string) string {
|
||||
path = strings.TrimPrefix(path, telemetrytypes.BodyJSONStringSearchPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyPromotedColumnPrefix)
|
||||
return path
|
||||
}
|
||||
151
pkg/telemetrymetadata/body_json_metadata_test.go
Normal file
151
pkg/telemetrymetadata/body_json_metadata_test.go
Normal file
@@ -0,0 +1,151 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildGetBodyJSONPathsQuery(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
fieldKeySelectors []*telemetrytypes.FieldKeySelector
|
||||
expectedSQL string
|
||||
expectedArgs []any
|
||||
expectedLimit int
|
||||
}{
|
||||
|
||||
{
|
||||
name: "Single search text with EQUAL operator",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "user.name",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path = ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"user.name", defaultPathLimit},
|
||||
expectedLimit: defaultPathLimit,
|
||||
},
|
||||
{
|
||||
name: "Single search text with LIKE operator",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "user",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"user", 100},
|
||||
expectedLimit: 100,
|
||||
},
|
||||
{
|
||||
name: "Multiple search texts with EQUAL operator",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "user.name",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact,
|
||||
},
|
||||
{
|
||||
Name: "user.age",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path = ? OR path = ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"user.name", "user.age", defaultPathLimit},
|
||||
expectedLimit: defaultPathLimit,
|
||||
},
|
||||
{
|
||||
name: "Multiple search texts with LIKE operator",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "user",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
|
||||
},
|
||||
{
|
||||
Name: "admin",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ? OR path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"user", "admin", defaultPathLimit},
|
||||
expectedLimit: defaultPathLimit,
|
||||
},
|
||||
{
|
||||
name: "Search with Contains operator (should default to LIKE)",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "test",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"test", defaultPathLimit},
|
||||
expectedLimit: defaultPathLimit,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
query, args, limit, err := buildGetBodyJSONPathsQuery(tc.fieldKeySelectors)
|
||||
require.NoError(t, err, "Error building query: %v", err)
|
||||
|
||||
require.Equal(t, tc.expectedSQL, query)
|
||||
require.Equal(t, tc.expectedArgs, args)
|
||||
require.Equal(t, tc.expectedLimit, limit)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildListLogsJSONIndexesQuery(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
cluster string
|
||||
filters []string
|
||||
expectedSQL string
|
||||
expectedArgs []any
|
||||
}{
|
||||
{
|
||||
name: "No filters",
|
||||
cluster: "test-cluster",
|
||||
filters: nil,
|
||||
expectedSQL: "SELECT name, type_full, expr, granularity FROM clusterAllReplicas('test-cluster', system.data_skipping_indices) " +
|
||||
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?))",
|
||||
expectedArgs: []any{
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2LocalTableName,
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyJSONColumnPrefix)),
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyPromotedColumnPrefix)),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "With filters",
|
||||
cluster: "test-cluster",
|
||||
filters: []string{"foo", "bar"},
|
||||
expectedSQL: "SELECT name, type_full, expr, granularity FROM clusterAllReplicas('test-cluster', system.data_skipping_indices) " +
|
||||
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?)) AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?))",
|
||||
expectedArgs: []any{
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2LocalTableName,
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyJSONColumnPrefix)),
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyPromotedColumnPrefix)),
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains("foo")),
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains("bar")),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
query, args := buildListLogsJSONIndexesQuery(tc.cluster, tc.filters...)
|
||||
|
||||
require.Equal(t, tc.expectedSQL, query)
|
||||
require.Equal(t, tc.expectedArgs, args)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,12 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import otelcollectorconst "github.com/SigNoz/signoz-otel-collector/constants"
|
||||
|
||||
const (
|
||||
DBName = "signoz_metadata"
|
||||
AttributesMetadataTableName = "distributed_attributes_metadata"
|
||||
AttributesMetadataLocalTableName = "attributes_metadata"
|
||||
PathTypesTableName = otelcollectorconst.DistributedPathTypesTable
|
||||
PromotedPathsTableName = otelcollectorconst.DistributedPromotedPathsTable
|
||||
SkipIndexTableName = "system.data_skipping_indices"
|
||||
)
|
||||
|
||||
@@ -165,53 +165,65 @@ func (c *conditionBuilder) conditionFor(
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
|
||||
var value any
|
||||
switch column.Type {
|
||||
case schema.JSONColumnType{}:
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(tblFieldName), nil
|
||||
} else {
|
||||
return sb.IsNull(tblFieldName), nil
|
||||
}
|
||||
case schema.ColumnTypeString,
|
||||
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
schema.FixedStringColumnType{Length: 32},
|
||||
schema.DateTime64ColumnType{Precision: 9, Timezone: "UTC"}:
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumFixedString,
|
||||
schema.ColumnTypeEnumDateTime64:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
} else {
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.ColumnTypeUInt64,
|
||||
schema.ColumnTypeUInt32,
|
||||
schema.ColumnTypeUInt8,
|
||||
schema.ColumnTypeInt8,
|
||||
schema.ColumnTypeInt16,
|
||||
schema.ColumnTypeBool:
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
}
|
||||
return sb.E(tblFieldName, value), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
|
||||
case schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
schema.ColumnTypeEnumUInt8,
|
||||
schema.ColumnTypeEnumInt8,
|
||||
schema.ColumnTypeEnumInt16,
|
||||
schema.ColumnTypeEnumBool:
|
||||
value = 0
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
} else {
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}, schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}, schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
} else {
|
||||
return sb.NE(leftOperand, true), nil
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
} else {
|
||||
return sb.NE(leftOperand, true), nil
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
|
||||
@@ -239,8 +239,8 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
return "", err
|
||||
}
|
||||
|
||||
switch column.Type {
|
||||
case schema.JSONColumnType{}:
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
@@ -256,44 +256,38 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
} else {
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
||||
}
|
||||
|
||||
case schema.ColumnTypeString,
|
||||
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
schema.ColumnTypeUInt64,
|
||||
schema.ColumnTypeUInt32,
|
||||
schema.ColumnTypeInt8,
|
||||
schema.ColumnTypeInt16,
|
||||
schema.ColumnTypeBool,
|
||||
schema.DateTime64ColumnType{Precision: 9, Timezone: "UTC"},
|
||||
schema.FixedStringColumnType{Length: 32}:
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
schema.ColumnTypeEnumInt8,
|
||||
schema.ColumnTypeEnumInt16,
|
||||
schema.ColumnTypeEnumBool,
|
||||
schema.ColumnTypeEnumDateTime64,
|
||||
schema.ColumnTypeEnumFixedString:
|
||||
return column.Name, nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
return column.Name, nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
}
|
||||
// should not reach here
|
||||
return column.Name, nil
|
||||
|
||||
@@ -17,6 +17,10 @@ var (
|
||||
FieldSelectorMatchTypeFuzzy = FieldSelectorMatchType{valuer.NewString("fuzzy")}
|
||||
)
|
||||
|
||||
// BodyJSONStringSearchPrefix is the prefix used for body JSON search queries
|
||||
// e.g., "body.status" where "body." is the prefix
|
||||
const BodyJSONStringSearchPrefix = `body.`
|
||||
|
||||
type TelemetryFieldKey struct {
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description,omitempty"`
|
||||
@@ -24,7 +28,10 @@ type TelemetryFieldKey struct {
|
||||
Signal Signal `json:"signal,omitempty"`
|
||||
FieldContext FieldContext `json:"fieldContext,omitempty"`
|
||||
FieldDataType FieldDataType `json:"fieldDataType,omitempty"`
|
||||
Materialized bool `json:"-"`
|
||||
|
||||
JSONDataType *JSONDataType `json:"-,omitempty"`
|
||||
Indexes []JSONDataTypeIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
}
|
||||
|
||||
func (f TelemetryFieldKey) String() string {
|
||||
|
||||
@@ -36,6 +36,9 @@ import (
|
||||
//
|
||||
// - Use `log.` for explicit log context
|
||||
// - `log.severity_text` will always resolve to `severity_text` of log record
|
||||
//
|
||||
// - Use `body.` to indicate and enforce body context
|
||||
// - `body.key` will look for `key` in the body field
|
||||
type FieldContext struct {
|
||||
valuer.String
|
||||
}
|
||||
@@ -49,6 +52,7 @@ var (
|
||||
FieldContextScope = FieldContext{valuer.NewString("scope")}
|
||||
FieldContextAttribute = FieldContext{valuer.NewString("attribute")}
|
||||
FieldContextEvent = FieldContext{valuer.NewString("event")}
|
||||
FieldContextBody = FieldContext{valuer.NewString("body")}
|
||||
FieldContextUnspecified = FieldContext{valuer.NewString("")}
|
||||
|
||||
// Map string representations to FieldContext values
|
||||
@@ -65,6 +69,7 @@ var (
|
||||
"point": FieldContextAttribute,
|
||||
"attribute": FieldContextAttribute,
|
||||
"event": FieldContextEvent,
|
||||
"body": FieldContextBody,
|
||||
"spanfield": FieldContextSpan,
|
||||
"span": FieldContextSpan,
|
||||
"logfield": FieldContextLog,
|
||||
@@ -144,6 +149,8 @@ func (f FieldContext) TagType() string {
|
||||
return "metricfield"
|
||||
case FieldContextEvent:
|
||||
return "eventfield"
|
||||
case FieldContextBody:
|
||||
return "body"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -31,6 +31,9 @@ var (
|
||||
FieldDataTypeArrayInt64 = FieldDataType{valuer.NewString("[]int64")}
|
||||
FieldDataTypeArrayNumber = FieldDataType{valuer.NewString("[]number")}
|
||||
|
||||
FieldDataTypeArrayObject = FieldDataType{valuer.NewString("[]object")}
|
||||
FieldDataTypeArrayDynamic = FieldDataType{valuer.NewString("[]dynamic")}
|
||||
|
||||
// Map string representations to FieldDataType values
|
||||
// We want to handle all the possible string representations of the data types.
|
||||
// Even if the user uses some non-standard representation, we want to be able to
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package telemetrytypes
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -86,7 +87,7 @@ func TestGetFieldKeyFromKeyText(t *testing.T) {
|
||||
|
||||
for _, testCase := range testCases {
|
||||
result := GetFieldKeyFromKeyText(testCase.keyText)
|
||||
if result != testCase.expected {
|
||||
if !reflect.DeepEqual(result, testCase.expected) {
|
||||
t.Errorf("expected %v, got %v", testCase.expected, result)
|
||||
}
|
||||
}
|
||||
|
||||
80
pkg/types/telemetrytypes/json_datatype.go
Normal file
80
pkg/types/telemetrytypes/json_datatype.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package telemetrytypes
|
||||
|
||||
type JSONDataTypeIndex struct {
|
||||
Type JSONDataType
|
||||
ColumnExpression string
|
||||
IndexExpression string
|
||||
}
|
||||
|
||||
type JSONDataType struct {
|
||||
str string // Store the correct case for ClickHouse
|
||||
IsArray bool
|
||||
ScalerType string
|
||||
IndexSupported bool
|
||||
}
|
||||
|
||||
// Override StringValue to return the correct case
|
||||
func (jdt JSONDataType) StringValue() string {
|
||||
return jdt.str
|
||||
}
|
||||
|
||||
var (
|
||||
String = JSONDataType{"String", false, "", true}
|
||||
Int64 = JSONDataType{"Int64", false, "", true}
|
||||
Float64 = JSONDataType{"Float64", false, "", true}
|
||||
Bool = JSONDataType{"Bool", false, "", false}
|
||||
Dynamic = JSONDataType{"Dynamic", false, "", false}
|
||||
ArrayString = JSONDataType{"Array(Nullable(String))", true, "String", false}
|
||||
ArrayInt64 = JSONDataType{"Array(Nullable(Int64))", true, "Int64", false}
|
||||
ArrayFloat64 = JSONDataType{"Array(Nullable(Float64))", true, "Float64", false}
|
||||
ArrayBool = JSONDataType{"Array(Nullable(Bool))", true, "Bool", false}
|
||||
ArrayDynamic = JSONDataType{"Array(Dynamic)", true, "Dynamic", false}
|
||||
ArrayJSON = JSONDataType{"Array(JSON)", true, "JSON", false}
|
||||
)
|
||||
|
||||
var MappingStringToJSONDataType = map[string]JSONDataType{
|
||||
"String": String,
|
||||
"Int64": Int64,
|
||||
"Float64": Float64,
|
||||
"Bool": Bool,
|
||||
"Dynamic": Dynamic,
|
||||
"Array(Nullable(String))": ArrayString,
|
||||
"Array(Nullable(Int64))": ArrayInt64,
|
||||
"Array(Nullable(Float64))": ArrayFloat64,
|
||||
"Array(Nullable(Bool))": ArrayBool,
|
||||
"Array(Dynamic)": ArrayDynamic,
|
||||
"Array(JSON)": ArrayJSON,
|
||||
}
|
||||
|
||||
var ScalerTypeToArrayType = map[JSONDataType]JSONDataType{
|
||||
String: ArrayString,
|
||||
Int64: ArrayInt64,
|
||||
Float64: ArrayFloat64,
|
||||
Bool: ArrayBool,
|
||||
Dynamic: ArrayDynamic,
|
||||
}
|
||||
|
||||
var MappingFieldDataTypeToJSONDataType = map[FieldDataType]JSONDataType{
|
||||
FieldDataTypeString: String,
|
||||
FieldDataTypeInt64: Int64,
|
||||
FieldDataTypeFloat64: Float64,
|
||||
FieldDataTypeNumber: Float64,
|
||||
FieldDataTypeBool: Bool,
|
||||
FieldDataTypeArrayString: ArrayString,
|
||||
FieldDataTypeArrayInt64: ArrayInt64,
|
||||
FieldDataTypeArrayFloat64: ArrayFloat64,
|
||||
FieldDataTypeArrayBool: ArrayBool,
|
||||
}
|
||||
|
||||
var MappingJSONDataTypeToFieldDataType = map[JSONDataType]FieldDataType{
|
||||
String: FieldDataTypeString,
|
||||
Int64: FieldDataTypeInt64,
|
||||
Float64: FieldDataTypeFloat64,
|
||||
Bool: FieldDataTypeBool,
|
||||
ArrayString: FieldDataTypeArrayString,
|
||||
ArrayInt64: FieldDataTypeArrayInt64,
|
||||
ArrayFloat64: FieldDataTypeArrayFloat64,
|
||||
ArrayBool: FieldDataTypeArrayBool,
|
||||
ArrayDynamic: FieldDataTypeArrayDynamic,
|
||||
ArrayJSON: FieldDataTypeArrayObject,
|
||||
}
|
||||
Reference in New Issue
Block a user