mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-18 10:42:14 +00:00
Compare commits
2 Commits
feat/gaps-
...
poc-mock-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ecde40a22 | ||
|
|
7f33145f68 |
8
.vscode/settings.json
vendored
8
.vscode/settings.json
vendored
@@ -12,6 +12,14 @@
|
||||
"editor.formatOnSave": true,
|
||||
"editor.defaultFormatter": "golang.go"
|
||||
},
|
||||
"go.buildTags": "chdb",
|
||||
"go.testFlags": ["-tags=chdb"],
|
||||
"go.toolsEnvVars": {
|
||||
"GOFLAGS": "-tags=chdb"
|
||||
},
|
||||
"gopls": {
|
||||
"build.buildFlags": ["-tags=chdb"]
|
||||
},
|
||||
"[sql]": {
|
||||
"editor.defaultFormatter": "adpyke.vscode-sql-formatter"
|
||||
},
|
||||
|
||||
7
go.mod
7
go.mod
@@ -107,6 +107,8 @@ require (
|
||||
github.com/aws/smithy-go v1.24.0 // indirect
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/c-bata/go-prompt v0.2.6 // indirect
|
||||
github.com/chdb-io/chdb-go v1.11.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
github.com/go-openapi/swag/cmdutils v0.25.4 // indirect
|
||||
@@ -126,11 +128,16 @@ require (
|
||||
github.com/goccy/go-yaml v1.19.2 // indirect
|
||||
github.com/hashicorp/go-metrics v0.5.4 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
||||
github.com/mattn/go-tty v0.0.5 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/pkg/term v1.2.0-beta.2 // indirect
|
||||
github.com/prometheus/client_golang/exp v0.0.0-20260108101519-fb0838f53562 // indirect
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.15.1 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/swaggest/refl v1.4.0 // indirect
|
||||
github.com/swaggest/usecase v1.3.1 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
|
||||
21
go.sum
21
go.sum
@@ -206,6 +206,8 @@ github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7
|
||||
github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc=
|
||||
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
|
||||
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
|
||||
github.com/c-bata/go-prompt v0.2.6 h1:POP+nrHE+DfLYx370bedwNhsqmpCUynWPxuHi0C5vZI=
|
||||
github.com/c-bata/go-prompt v0.2.6/go.mod h1:/LMAke8wD2FsNu9EXNdHxNLbd9MedkPnCdfpU9wwHfY=
|
||||
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
@@ -216,6 +218,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/chdb-io/chdb-go v1.11.0 h1:G6+Oy1onzNL3bSxncGfIdiB6beTpxwKztjfai7qLckE=
|
||||
github.com/chdb-io/chdb-go v1.11.0/go.mod h1:RkT+xLXhdBKtUtJJPwhQQR4p6qiXHisJNS712QldDg8=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
@@ -760,6 +764,7 @@ github.com/mattermost/xml-roundtrip-validator v0.1.0/go.mod h1:qccnGMcpgwcNaBnxq
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
|
||||
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
|
||||
@@ -773,6 +778,14 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
|
||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
|
||||
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
||||
github.com/mattn/go-tty v0.0.3/go.mod h1:ihxohKRERHTVzN+aSVRwACLCeqIoZAWpoICkkvrWyR0=
|
||||
github.com/mattn/go-tty v0.0.5 h1:s09uXI7yDbXzzTTfw3zonKFzwGkyYlgU3OMjqA0ddz4=
|
||||
github.com/mattn/go-tty v0.0.5/go.mod h1:u5GGXBtZU6RQoKV8gY5W6UhMudbR5vXnUe7j3pxse28=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
|
||||
@@ -900,6 +913,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
|
||||
github.com/pkg/term v1.2.0-beta.2 h1:L3y/h2jkuBVFdWiJvNfYfKmzcCnILw7mJWm2JQuMppw=
|
||||
github.com/pkg/term v1.2.0-beta.2/go.mod h1:E25nymQcrSllhX42Ok8MRm1+hyBdHY0dCeiKZ9jpNGw=
|
||||
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
|
||||
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
@@ -961,6 +976,9 @@ github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
||||
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||
@@ -1533,6 +1551,7 @@ golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@@ -1554,6 +1573,8 @@ golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200918174421-af09f7315aff/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
||||
@@ -2,17 +2,15 @@ package telemetrylogs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/chdbtelemetrystore"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -809,44 +807,30 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func buildTestTelemetryMetadataStore(t *testing.T, promotedPaths ...string) *telemetrytypestest.MockMetadataStore {
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
func enableBodyJSONQuery(_ *testing.T) {
|
||||
querybuilder.BodyJSONQueryEnabled = true
|
||||
}
|
||||
|
||||
types, _ := telemetrytypes.TestJSONTypeSet()
|
||||
for path, jsonTypes := range types {
|
||||
promoted := false
|
||||
|
||||
split := strings.Split(path, telemetrytypes.ArraySep)
|
||||
if path == "message" {
|
||||
promoted = true
|
||||
} else if slices.Contains(promotedPaths, split[0]) {
|
||||
promoted = true
|
||||
}
|
||||
// Create a TelemetryFieldKey for each JSONDataType for this path
|
||||
// Since a path can have multiple types, we create one key per type
|
||||
for _, jsonType := range jsonTypes {
|
||||
key := &telemetrytypes.TelemetryFieldKey{
|
||||
Name: path,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldDataType: telemetrytypes.MappingJSONDataTypeToFieldDataType[jsonType],
|
||||
JSONDataType: &jsonType,
|
||||
Materialized: promoted,
|
||||
}
|
||||
err := key.SetJSONAccessPlan(telemetrytypes.JSONColumnMetadata{
|
||||
BaseColumn: LogsV2BodyJSONColumn,
|
||||
PromotedColumn: LogsV2BodyPromotedColumn,
|
||||
}, types)
|
||||
require.NoError(t, err)
|
||||
mockMetadataStore.SetKey(key)
|
||||
}
|
||||
}
|
||||
|
||||
return mockMetadataStore
|
||||
func disableBodyJSONQuery(_ *testing.T) {
|
||||
querybuilder.BodyJSONQueryEnabled = false
|
||||
}
|
||||
|
||||
func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQueryStatementBuilder {
|
||||
mockMetadataStore := buildTestTelemetryMetadataStore(t, promotedPaths...)
|
||||
t.Helper()
|
||||
provider, cleanup, err := chdbtelemetrystore.New()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
ctx := context.Background()
|
||||
types, _ := telemetrytypes.TestJSONTypeSet()
|
||||
require.NoError(t, provider.SeedBodyJSONPaths(ctx, types))
|
||||
|
||||
// "message" is always promoted in these tests.
|
||||
allPromoted := append([]string{"message"}, promotedPaths...)
|
||||
require.NoError(t, provider.SeedPromotedPaths(ctx, allPromoted...))
|
||||
|
||||
metadataStore := chdbtelemetrystore.NewChdbMetadataStore(provider)
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
@@ -855,14 +839,14 @@ func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQu
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
fm,
|
||||
cb,
|
||||
mockMetadataStore,
|
||||
metadataStore,
|
||||
DefaultFullTextColumn,
|
||||
GetBodyJSONKey,
|
||||
)
|
||||
|
||||
statementBuilder := NewLogQueryStatementBuilder(
|
||||
return NewLogQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
metadataStore,
|
||||
fm,
|
||||
cb,
|
||||
resourceFilterStmtBuilder,
|
||||
@@ -870,27 +854,4 @@ func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQu
|
||||
DefaultFullTextColumn,
|
||||
GetBodyJSONKey,
|
||||
)
|
||||
|
||||
return statementBuilder
|
||||
}
|
||||
|
||||
func testAddIndexedPaths(t *testing.T, statementBuilder *logQueryStatementBuilder, telemetryFieldKeys ...*telemetrytypes.TelemetryFieldKey) {
|
||||
mockMetadataStore := statementBuilder.metadataStore.(*telemetrytypestest.MockMetadataStore)
|
||||
for _, key := range telemetryFieldKeys {
|
||||
if strings.Contains(key.Name, telemetrytypes.ArraySep) || strings.Contains(key.Name, telemetrytypes.ArrayAnyIndex) {
|
||||
t.Fatalf("array paths are not supported: %s", key.Name)
|
||||
}
|
||||
|
||||
for _, storedKey := range mockMetadataStore.KeysMap[key.Name] {
|
||||
storedKey.Indexes = append(storedKey.Indexes, key.Indexes...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func enableBodyJSONQuery(_ *testing.T) {
|
||||
querybuilder.BodyJSONQueryEnabled = true
|
||||
}
|
||||
|
||||
func disableBodyJSONQuery(_ *testing.T) {
|
||||
querybuilder.BodyJSONQueryEnabled = false
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -252,13 +251,13 @@ func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...st
|
||||
return cleanIndexes, nil
|
||||
}
|
||||
|
||||
func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, []any) {
|
||||
func buildListLogsJSONIndexesQuery(cluster, logsDBName, logsV2LocalTblName string, filters ...string) (string, []any) {
|
||||
sb := sqlbuilder.Select(
|
||||
"name", "type_full", "expr", "granularity",
|
||||
).From(fmt.Sprintf("clusterAllReplicas('%s', %s)", cluster, SkipIndexTableName))
|
||||
|
||||
sb.Where(sb.Equal("database", telemetrylogs.DBName))
|
||||
sb.Where(sb.Equal("table", telemetrylogs.LogsV2LocalTableName))
|
||||
sb.Where(sb.Equal("database", logsDBName))
|
||||
sb.Where(sb.Equal("table", logsV2LocalTblName))
|
||||
sb.Where(sb.Or(
|
||||
sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyV2ColumnPrefix))),
|
||||
sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyPromotedColumnPrefix))),
|
||||
@@ -275,7 +274,7 @@ func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, [
|
||||
|
||||
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
|
||||
ctx = withTelemetryContext(ctx, "ListLogsJSONIndexes")
|
||||
query, args := buildListLogsJSONIndexesQuery(t.telemetrystore.Cluster(), filters...)
|
||||
query, args := buildListLogsJSONIndexesQuery(t.telemetrystore.Cluster(), t.logsDBName, logsV2LocalTableName, filters...)
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to load string indexed columns")
|
||||
@@ -317,12 +316,12 @@ func (t *telemetryMetaStore) ListJSONValues(ctx context.Context, path string, li
|
||||
}
|
||||
|
||||
if promoted {
|
||||
path = telemetrylogs.BodyPromotedColumnPrefix + path
|
||||
path = constants.BodyPromotedColumnPrefix + path
|
||||
} else {
|
||||
path = telemetrylogs.BodyJSONColumnPrefix + path
|
||||
path = constants.BodyV2ColumnPrefix + path
|
||||
}
|
||||
|
||||
from := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
|
||||
from := fmt.Sprintf("%s.%s", t.logsDBName, t.logsV2TblName)
|
||||
colExpr := func(typ telemetrytypes.JSONDataType) string {
|
||||
return fmt.Sprintf("dynamicElement(%s, '%s')", path, typ.StringValue())
|
||||
}
|
||||
@@ -471,7 +470,7 @@ func (t *telemetryMetaStore) IsPathPromoted(ctx context.Context, path string) (b
|
||||
split := strings.Split(path, telemetrytypes.ArraySep)
|
||||
pathSegment := split[0]
|
||||
query := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE signal = ? AND column_name = ? AND field_context = ? AND field_name = ? LIMIT 1", DBName, PromotedPathsTableName)
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, telemetrytypes.SignalLogs, telemetrylogs.LogsV2BodyPromotedColumn, telemetrytypes.FieldContextBody, pathSegment)
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, telemetrytypes.SignalLogs, constants.BodyPromotedColumn, telemetrytypes.FieldContextBody, pathSegment)
|
||||
if err != nil {
|
||||
return false, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to check if path %s is promoted", path)
|
||||
}
|
||||
@@ -486,7 +485,7 @@ func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...stri
|
||||
sb := sqlbuilder.Select("field_name").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
||||
conditions := []string{
|
||||
sb.Equal("signal", telemetrytypes.SignalLogs),
|
||||
sb.Equal("column_name", telemetrylogs.LogsV2BodyPromotedColumn),
|
||||
sb.Equal("column_name", constants.BodyPromotedColumn),
|
||||
sb.Equal("field_context", telemetrytypes.FieldContextBody),
|
||||
sb.NotEqual("field_name", "__all__"),
|
||||
}
|
||||
@@ -522,8 +521,8 @@ func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...stri
|
||||
// TODO(Piyush): Remove this function
|
||||
func CleanPathPrefixes(path string) string {
|
||||
path = strings.TrimPrefix(path, telemetrytypes.BodyJSONStringSearchPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyPromotedColumnPrefix)
|
||||
path = strings.TrimPrefix(path, constants.BodyV2ColumnPrefix)
|
||||
path = strings.TrimPrefix(path, constants.BodyPromotedColumnPrefix)
|
||||
return path
|
||||
}
|
||||
|
||||
@@ -543,7 +542,7 @@ func (t *telemetryMetaStore) PromotePaths(ctx context.Context, paths ...string)
|
||||
if trimmed == "" {
|
||||
continue
|
||||
}
|
||||
if err := batch.Append(telemetrytypes.SignalLogs, telemetrylogs.LogsV2BodyPromotedColumn, "JSON()", telemetrytypes.FieldContextBody, trimmed, 0, releaseTime); err != nil {
|
||||
if err := batch.Append(telemetrytypes.SignalLogs, constants.BodyPromotedColumn, "JSON()", telemetrytypes.FieldContextBody, trimmed, 0, releaseTime); err != nil {
|
||||
_ = batch.Abort()
|
||||
return errors.WrapInternalf(err, CodeFailedToAppendPath, "failed to append path")
|
||||
}
|
||||
|
||||
@@ -140,7 +140,7 @@ func TestBuildListLogsJSONIndexesQuery(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
query, args := buildListLogsJSONIndexesQuery(tc.cluster, tc.filters...)
|
||||
query, args := buildListLogsJSONIndexesQuery(tc.cluster, telemetrylogs.DBName, telemetrylogs.LogsV2LocalTableName, tc.filters...)
|
||||
|
||||
require.Equal(t, tc.expectedSQL, query)
|
||||
require.Equal(t, tc.expectedArgs, args)
|
||||
|
||||
6
pkg/telemetrymetadata/logs_constants.go
Normal file
6
pkg/telemetrymetadata/logs_constants.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package telemetrymetadata
|
||||
|
||||
// logsV2LocalTableName is the local (non-distributed) ClickHouse table for logs v2.
|
||||
// Defined here instead of importing telemetrylogs to avoid an import cycle:
|
||||
// telemetrylogs tests → chdbtelemetrystoretest → telemetrymetadata → telemetrylogs.
|
||||
const logsV2LocalTableName = "logs_v2"
|
||||
@@ -0,0 +1,120 @@
|
||||
//go:build chdb
|
||||
|
||||
// Package chdbtelemetrystoretest provides central test builder functions backed by
|
||||
// an in-process chdb session. These builders are used across multiple signal packages
|
||||
// to avoid import cycles: telemetrymetadata previously imported telemetrylogs, which
|
||||
// would create a cycle if telemetrylogs tests tried to use telemetrymetadata.
|
||||
// With that dependency removed, this package can safely import both.
|
||||
package chdbtelemetrystoretest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
otelcollectorconstants "github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/chdbtelemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Logs table name constants mirroring telemetrylogs — kept here to avoid importing
|
||||
// that package (which would create a cycle when telemetrylogs tests import this package).
|
||||
const (
|
||||
logsDBName = "signoz_logs"
|
||||
logsV2TblName = "distributed_logs_v2"
|
||||
logsTagAttrTblName = "distributed_tag_attributes_v2"
|
||||
logAttrKeysTblName = "distributed_logs_attribute_keys"
|
||||
logResKeysTblName = "distributed_logs_resource_keys"
|
||||
)
|
||||
|
||||
// NewLogsMetadataStore creates a chdb-backed MetadataStore seeded from the provided
|
||||
// TelemetryFieldKeys. Body-context keys are inserted into distributed_json_path_types;
|
||||
// keys with Materialized=true have their root path inserted into the column-evolution
|
||||
// metadata table so the store treats them as promoted.
|
||||
// The returned cleanup function must be called (typically via t.Cleanup).
|
||||
func NewLogsMetadataStore(t *testing.T, keys ...*telemetrytypes.TelemetryFieldKey) (telemetrytypes.MetadataStore, func()) {
|
||||
t.Helper()
|
||||
|
||||
provider, cleanup, err := chdbtelemetrystore.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, seedFromFieldKeys(ctx, provider, keys))
|
||||
|
||||
store := telemetrymetadata.NewTelemetryMetaStore(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
provider,
|
||||
"", "", "", "", // traces (unused in logs tests)
|
||||
"", "", // metrics (unused in logs tests)
|
||||
"", "", // meter (unused in logs tests)
|
||||
logsDBName,
|
||||
logsV2TblName,
|
||||
logsTagAttrTblName,
|
||||
logAttrKeysTblName,
|
||||
logResKeysTblName,
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.AttributesMetadataLocalTableName,
|
||||
)
|
||||
|
||||
return store, cleanup
|
||||
}
|
||||
|
||||
// seedFromFieldKeys inserts body-JSON path/type rows and promoted-path rows derived
|
||||
// from the given keys into the chdb session backing provider.
|
||||
func seedFromFieldKeys(ctx context.Context, provider *chdbtelemetrystore.Provider, keys []*telemetrytypes.TelemetryFieldKey) error {
|
||||
lastSeen := uint64(time.Now().UnixNano())
|
||||
releaseTime := time.Now().UnixNano()
|
||||
conn := provider.ClickhouseDB()
|
||||
|
||||
promotedPaths := map[string]bool{}
|
||||
|
||||
for _, key := range keys {
|
||||
if key.FieldContext != telemetrytypes.FieldContextBody || key.JSONDataType == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Insert into distributed_json_path_types
|
||||
query := fmt.Sprintf(
|
||||
"INSERT INTO %s.%s (%s, %s, %s) VALUES (?, ?, ?)",
|
||||
otelcollectorconstants.SignozMetadataDB,
|
||||
otelcollectorconstants.DistributedPathTypesTable,
|
||||
otelcollectorconstants.PathTypesTablePathColumn,
|
||||
otelcollectorconstants.PathTypesTableTypeColumn,
|
||||
otelcollectorconstants.PathTypesTableLastSeenColumn,
|
||||
)
|
||||
if err := conn.Exec(ctx, query, key.Name, key.JSONDataType.StringValue(), lastSeen); err != nil {
|
||||
return fmt.Errorf("seedFromFieldKeys: insert path %s/%s: %w", key.Name, key.JSONDataType.StringValue(), err)
|
||||
}
|
||||
|
||||
if key.Materialized {
|
||||
rootPath := strings.Split(key.Name, telemetrytypes.ArraySep)[0]
|
||||
promotedPaths[rootPath] = true
|
||||
}
|
||||
}
|
||||
|
||||
for path := range promotedPaths {
|
||||
query := fmt.Sprintf(
|
||||
"INSERT INTO %s.%s (signal, column_name, column_type, field_context, field_name, version, release_time) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
telemetrymetadata.DBName,
|
||||
telemetrymetadata.PromotedPathsTableName,
|
||||
)
|
||||
if err := conn.Exec(ctx, query,
|
||||
telemetrytypes.SignalLogs,
|
||||
otelcollectorconstants.BodyPromotedColumn,
|
||||
"JSON()",
|
||||
telemetrytypes.FieldContextBody,
|
||||
path,
|
||||
0,
|
||||
releaseTime,
|
||||
); err != nil {
|
||||
return fmt.Errorf("seedFromFieldKeys: insert promoted path %s: %w", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
//go:build !chdb
|
||||
|
||||
package chdbtelemetrystoretest
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
otelcollectorconstants "github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
|
||||
)
|
||||
|
||||
// NewLogsMetadataStore returns a MockMetadataStore populated from the provided
|
||||
// TelemetryFieldKeys. A type-cache is built from the keys so that SetJSONAccessPlan
|
||||
// can be resolved for each body-context key before it is stored.
|
||||
// The returned cleanup function is a no-op (nothing to tear down for an in-memory store).
|
||||
func NewLogsMetadataStore(t *testing.T, keys ...*telemetrytypes.TelemetryFieldKey) (telemetrytypes.MetadataStore, func()) {
|
||||
t.Helper()
|
||||
|
||||
mockStore := telemetrytypestest.NewMockMetadataStore()
|
||||
|
||||
// Build type-cache from the incoming keys so SetJSONAccessPlan can resolve
|
||||
// parent-path array types (used by nested / array paths).
|
||||
typeCache := make(map[string][]telemetrytypes.JSONDataType)
|
||||
for _, key := range keys {
|
||||
if key.JSONDataType != nil {
|
||||
typeCache[key.Name] = append(typeCache[key.Name], *key.JSONDataType)
|
||||
}
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody && key.JSONDataType != nil {
|
||||
if err := key.SetJSONAccessPlan(telemetrytypes.JSONColumnMetadata{
|
||||
BaseColumn: otelcollectorconstants.BodyV2Column,
|
||||
PromotedColumn: otelcollectorconstants.BodyPromotedColumn,
|
||||
}, typeCache); err != nil {
|
||||
t.Fatalf("NewLogsMetadataStore: SetJSONAccessPlan for %q: %v", key.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
if key.Materialized {
|
||||
rootPath := strings.Split(key.Name, telemetrytypes.ArraySep)[0]
|
||||
mockStore.PromotedPathsMap[rootPath] = true
|
||||
}
|
||||
|
||||
mockStore.SetKey(key)
|
||||
}
|
||||
|
||||
return mockStore, func() {}
|
||||
}
|
||||
140
pkg/telemetrystore/chdbtelemetrystore/conn.go
Normal file
140
pkg/telemetrystore/chdbtelemetrystore/conn.go
Normal file
@@ -0,0 +1,140 @@
|
||||
//go:build chdb
|
||||
|
||||
package chdbtelemetrystore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
chdb "github.com/chdb-io/chdb-go/chdb"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
// clusterAllReplicasRe matches clusterAllReplicas('<cluster>', <table>) and captures
|
||||
// the table expression so we can rewrite it for chdb's single-node context.
|
||||
var clusterAllReplicasRe = regexp.MustCompile(`(?i)clusterAllReplicas\('[^']*',\s*([^)]+)\)`)
|
||||
|
||||
// rewriteClusterAllReplicas strips the clusterAllReplicas wrapper from a query,
|
||||
// replacing it with a direct table reference. This lets single-node chdb sessions
|
||||
// execute queries originally written for a multi-node ClickHouse cluster.
|
||||
func rewriteClusterAllReplicas(query string) string {
|
||||
return clusterAllReplicasRe.ReplaceAllStringFunc(query, func(match string) string {
|
||||
sub := clusterAllReplicasRe.FindStringSubmatch(match)
|
||||
if len(sub) < 2 {
|
||||
return match
|
||||
}
|
||||
return strings.TrimSpace(sub[1])
|
||||
})
|
||||
}
|
||||
|
||||
// interpolateArgs substitutes ? placeholders in query using the ClickHouse SQL flavor
|
||||
// from go-sqlbuilder — the same mechanism used by chdb's own database/sql driver.
|
||||
func interpolateArgs(query string, args []any) (string, error) {
|
||||
if len(args) == 0 {
|
||||
return query, nil
|
||||
}
|
||||
return sqlbuilder.ClickHouse.Interpolate(query, args)
|
||||
}
|
||||
|
||||
// chdbConn wraps a chdb Session and exposes it as a clickhouse.Conn.
|
||||
// Exec, Select, Query, and QueryRow execute queries for real via chdb.
|
||||
// The remaining interface methods are lightweight stubs sufficient for testing.
|
||||
type chdbConn struct {
|
||||
session *chdb.Session
|
||||
}
|
||||
|
||||
var _ clickhouse.Conn = (*chdbConn)(nil)
|
||||
|
||||
func (c *chdbConn) Contributors() []string { return nil }
|
||||
|
||||
func (c *chdbConn) ServerVersion() (*driver.ServerVersion, error) {
|
||||
return &driver.ServerVersion{DisplayName: "chdb"}, nil
|
||||
}
|
||||
|
||||
func (c *chdbConn) Ping(_ context.Context) error { return nil }
|
||||
|
||||
func (c *chdbConn) Stats() driver.Stats { return driver.Stats{} }
|
||||
|
||||
func (c *chdbConn) Close() error {
|
||||
c.session.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *chdbConn) AsyncInsert(ctx context.Context, query string, _ bool, args ...any) error {
|
||||
return c.Exec(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (c *chdbConn) PrepareBatch(_ context.Context, _ string, _ ...driver.PrepareBatchOption) (driver.Batch, error) {
|
||||
return nil, fmt.Errorf("chdbConn: PrepareBatch not implemented")
|
||||
}
|
||||
|
||||
// Exec executes a DDL or DML statement (CREATE TABLE, INSERT, DROP, …) via chdb.
|
||||
// Any result set is discarded; only errors are surfaced.
|
||||
func (c *chdbConn) Exec(_ context.Context, query string, args ...any) error {
|
||||
query = rewriteClusterAllReplicas(query)
|
||||
compiled, err := interpolateArgs(query, args)
|
||||
if err != nil {
|
||||
return fmt.Errorf("chdbConn: Exec: interpolate args: %w", err)
|
||||
}
|
||||
result, err := c.session.Query(compiled, "CSV")
|
||||
if err != nil {
|
||||
return fmt.Errorf("chdbConn: Exec: %w", err)
|
||||
}
|
||||
defer result.Free()
|
||||
return result.Error()
|
||||
}
|
||||
|
||||
// Select executes query and scans all result rows into dest.
|
||||
// dest must be a pointer to a slice of structs or maps.
|
||||
//
|
||||
// Struct fields are matched to ClickHouse columns using the following priority:
|
||||
// 1. `ch:"<column>"` struct tag
|
||||
// 2. `json:"<column>"` struct tag
|
||||
// 3. Lowercased field name
|
||||
func (c *chdbConn) Select(_ context.Context, dest any, query string, args ...any) error {
|
||||
query = rewriteClusterAllReplicas(query)
|
||||
compiled, err := interpolateArgs(query, args)
|
||||
if err != nil {
|
||||
return fmt.Errorf("chdbConn: Select: interpolate args: %w", err)
|
||||
}
|
||||
result, err := c.session.Query(compiled, "JSONCompact")
|
||||
if err != nil {
|
||||
return fmt.Errorf("chdbConn: Select: %w", err)
|
||||
}
|
||||
defer result.Free()
|
||||
if err := result.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
return scanJSONCompactIntoSlice(result.String(), dest)
|
||||
}
|
||||
|
||||
// Query executes query and returns a Rows iterator.
|
||||
func (c *chdbConn) Query(_ context.Context, query string, args ...any) (driver.Rows, error) {
|
||||
query = rewriteClusterAllReplicas(query)
|
||||
compiled, err := interpolateArgs(query, args)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("chdbConn: Query: interpolate args: %w", err)
|
||||
}
|
||||
result, err := c.session.Query(compiled, "JSONCompact")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("chdbConn: Query: %w", err)
|
||||
}
|
||||
if err := result.Error(); err != nil {
|
||||
result.Free()
|
||||
return nil, err
|
||||
}
|
||||
return newChdbRows(result)
|
||||
}
|
||||
|
||||
// QueryRow executes query and returns a single Row.
|
||||
func (c *chdbConn) QueryRow(ctx context.Context, query string, args ...any) driver.Row {
|
||||
rows, err := c.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return &chdbRow{err: err}
|
||||
}
|
||||
return &chdbRow{rows: rows.(*chdbRows)}
|
||||
}
|
||||
177
pkg/telemetrystore/chdbtelemetrystore/json.go
Normal file
177
pkg/telemetrystore/chdbtelemetrystore/json.go
Normal file
@@ -0,0 +1,177 @@
|
||||
//go:build chdb
|
||||
|
||||
package chdbtelemetrystore
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// jsonCompactResult is the top-level structure of ClickHouse's JSONCompact output format.
|
||||
type jsonCompactResult struct {
|
||||
Meta []jsonMeta `json:"meta"`
|
||||
Data [][]json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
type jsonMeta struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// scanJSONCompactIntoSlice parses a JSONCompact response and appends rows into dest
|
||||
// (must be a pointer to a slice of structs or maps).
|
||||
func scanJSONCompactIntoSlice(jsonStr string, dest any) error {
|
||||
if strings.TrimSpace(jsonStr) == "" {
|
||||
return nil
|
||||
}
|
||||
var jr jsonCompactResult
|
||||
if err := json.Unmarshal([]byte(jsonStr), &jr); err != nil {
|
||||
return fmt.Errorf("chdbConn: Select: parse response: %w", err)
|
||||
}
|
||||
|
||||
destVal := reflect.ValueOf(dest)
|
||||
if destVal.Kind() != reflect.Ptr || destVal.Elem().Kind() != reflect.Slice {
|
||||
return fmt.Errorf("chdbConn: Select: dest must be a pointer to a slice, got %T", dest)
|
||||
}
|
||||
sliceVal := destVal.Elem()
|
||||
elemType := sliceVal.Type().Elem()
|
||||
|
||||
for _, row := range jr.Data {
|
||||
elem := reflect.New(elemType).Elem()
|
||||
if err := scanRowIntoValue(jr.Meta, row, elem); err != nil {
|
||||
return err
|
||||
}
|
||||
sliceVal.Set(reflect.Append(sliceVal, elem))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// scanRowIntoValue fills a struct or map Value from a single JSONCompact data row.
|
||||
func scanRowIntoValue(meta []jsonMeta, row []json.RawMessage, elem reflect.Value) error {
|
||||
switch elem.Kind() {
|
||||
case reflect.Struct:
|
||||
for i, m := range meta {
|
||||
if i >= len(row) {
|
||||
break
|
||||
}
|
||||
field := findStructField(elem, m.Name)
|
||||
if !field.IsValid() {
|
||||
continue
|
||||
}
|
||||
if err := unmarshalIntoField(row[i], field); err != nil {
|
||||
return fmt.Errorf("column %q: %w", m.Name, err)
|
||||
}
|
||||
}
|
||||
case reflect.Map:
|
||||
if elem.IsNil() {
|
||||
elem.Set(reflect.MakeMap(elem.Type()))
|
||||
}
|
||||
for i, m := range meta {
|
||||
if i >= len(row) {
|
||||
break
|
||||
}
|
||||
var v any
|
||||
if err := json.Unmarshal(row[i], &v); err != nil {
|
||||
return err
|
||||
}
|
||||
elem.SetMapIndex(reflect.ValueOf(m.Name), reflect.ValueOf(v))
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("chdbConn: Select: unsupported element kind %s", elem.Kind())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// findStructField returns the reflect.Value of the struct field corresponding to colName.
|
||||
// Priority: `ch` tag → `json` tag → lowercased field name.
|
||||
func findStructField(structVal reflect.Value, colName string) reflect.Value {
|
||||
t := structVal.Type()
|
||||
colLower := strings.ToLower(colName)
|
||||
for i := range t.NumField() {
|
||||
f := t.Field(i)
|
||||
if tag, _, _ := strings.Cut(f.Tag.Get("ch"), ","); tag == colName {
|
||||
return structVal.Field(i)
|
||||
}
|
||||
if tag, _, _ := strings.Cut(f.Tag.Get("json"), ","); tag == colName {
|
||||
return structVal.Field(i)
|
||||
}
|
||||
if strings.ToLower(f.Name) == colLower {
|
||||
return structVal.Field(i)
|
||||
}
|
||||
}
|
||||
return reflect.Value{}
|
||||
}
|
||||
|
||||
// unmarshalIntoField deserializes raw JSON into field, performing numeric conversions
|
||||
// needed for ClickHouse integer types (UInt64, Int64, …).
|
||||
func unmarshalIntoField(raw json.RawMessage, field reflect.Value) error {
|
||||
dec := json.NewDecoder(strings.NewReader(string(raw)))
|
||||
dec.UseNumber()
|
||||
var v any
|
||||
if err := dec.Decode(&v); err != nil {
|
||||
return err
|
||||
}
|
||||
return assignToField(field, v)
|
||||
}
|
||||
|
||||
// assignToField converts src (from json.Decoder with UseNumber) and assigns it to field.
|
||||
func assignToField(field reflect.Value, src any) error {
|
||||
if src == nil {
|
||||
field.Set(reflect.Zero(field.Type()))
|
||||
return nil
|
||||
}
|
||||
|
||||
if num, ok := src.(json.Number); ok {
|
||||
switch field.Kind() {
|
||||
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
||||
n, err := num.Int64()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
field.SetUint(uint64(n))
|
||||
return nil
|
||||
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
||||
n, err := num.Int64()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
field.SetInt(n)
|
||||
return nil
|
||||
case reflect.Float32, reflect.Float64:
|
||||
n, err := num.Float64()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
field.SetFloat(n)
|
||||
return nil
|
||||
case reflect.String:
|
||||
field.SetString(num.String())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Handle []interface{} → []T conversions (ClickHouse arrays decoded from JSON).
|
||||
if srcSlice, ok := src.([]interface{}); ok && field.Kind() == reflect.Slice {
|
||||
result := reflect.MakeSlice(field.Type(), len(srcSlice), len(srcSlice))
|
||||
for i, item := range srcSlice {
|
||||
if err := assignToField(result.Index(i), item); err != nil {
|
||||
return fmt.Errorf("slice element %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
field.Set(result)
|
||||
return nil
|
||||
}
|
||||
|
||||
srcVal := reflect.ValueOf(src)
|
||||
if srcVal.Type().AssignableTo(field.Type()) {
|
||||
field.Set(srcVal)
|
||||
return nil
|
||||
}
|
||||
if srcVal.Type().ConvertibleTo(field.Type()) {
|
||||
field.Set(srcVal.Convert(field.Type()))
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("cannot assign %T to %s", src, field.Type())
|
||||
}
|
||||
109
pkg/telemetrystore/chdbtelemetrystore/migrations.go
Normal file
109
pkg/telemetrystore/chdbtelemetrystore/migrations.go
Normal file
@@ -0,0 +1,109 @@
|
||||
//go:build chdb
|
||||
|
||||
package chdbtelemetrystore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
chdb "github.com/chdb-io/chdb-go/chdb"
|
||||
)
|
||||
|
||||
// runMigrations applies the full signoz-otel-collector logs schema against the given
|
||||
// chdb session. It mirrors the same migration set that the collector runs on a real
|
||||
// ClickHouse cluster (CustomRetentionLogsMigrations + LogsMigrationsV2), with the
|
||||
// following chdb-specific adaptations:
|
||||
//
|
||||
// - CREATE DATABASE statements are prepended so the tables have a home.
|
||||
// - Distributed engine tables are replaced with MergeTree ORDER BY tuple() so
|
||||
// every "distributed_*" table is a real, writable table in single-node chdb.
|
||||
// - Operations that don't make sense without a cluster (TTL materialisation,
|
||||
// MATERIALIZE COLUMN, MODIFY SETTINGS with serialisation keys) are skipped.
|
||||
func runMigrations(session *chdb.Session) error {
|
||||
// Ensure databases exist before any table DDL.
|
||||
for _, db := range []string{"signoz_logs", "signoz_metadata"} {
|
||||
if err := execSQL(session, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", db)); err != nil {
|
||||
return fmt.Errorf("create database %s: %w", db, err)
|
||||
}
|
||||
}
|
||||
|
||||
migrationSets := [][]schemamigrator.SchemaMigrationRecord{
|
||||
schemamigrator.CustomRetentionLogsMigrations,
|
||||
schemamigrator.MetadataMigrations,
|
||||
schemamigrator.LogsMigrationsV2,
|
||||
}
|
||||
|
||||
for _, set := range migrationSets {
|
||||
for _, record := range set {
|
||||
for _, op := range record.UpItems {
|
||||
sql, skip := toChdbSQL(op)
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
if err := execSQL(session, sql); err != nil {
|
||||
return fmt.Errorf("migration %d: %w", record.MigrationID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// toChdbSQL converts a schemamigrator.Operation to a chdb-compatible SQL string.
|
||||
// Returns (sql, skip=true) for operations that should be omitted in a single-node
|
||||
// chdb context.
|
||||
func toChdbSQL(op schemamigrator.Operation) (sql string, skip bool) {
|
||||
switch o := op.(type) {
|
||||
case schemamigrator.CreateTableOperation:
|
||||
return adaptCreateTable(o), false
|
||||
|
||||
case schemamigrator.DropTableOperation:
|
||||
// Idempotent; safe to run even if the table never existed.
|
||||
return o.ToSQL(), false
|
||||
|
||||
case schemamigrator.AlterTableAddColumn,
|
||||
schemamigrator.AlterTableAddIndex,
|
||||
schemamigrator.AlterTableDropColumn,
|
||||
schemamigrator.AlterTableDropIndex:
|
||||
return o.ToSQL(), false
|
||||
|
||||
// TTL is a production data-retention concern; irrelevant for test sessions.
|
||||
case schemamigrator.AlterTableModifyTTL,
|
||||
schemamigrator.AlterTableDropTTL,
|
||||
// Background mutation; not needed in ephemeral test tables.
|
||||
schemamigrator.AlterTableMaterializeColumn,
|
||||
// Includes serialisation settings (object_serialization_version, …) that
|
||||
// may not be recognised by the embedded chdb build.
|
||||
schemamigrator.AlterTableModifySettings,
|
||||
// Materialized views are not required for query-generation tests.
|
||||
schemamigrator.CreateMaterializedViewOperation:
|
||||
return "", true
|
||||
|
||||
default:
|
||||
// Unknown operation type — skip conservatively.
|
||||
return "", true
|
||||
}
|
||||
}
|
||||
|
||||
// adaptCreateTable rewrites a CreateTableOperation for chdb:
|
||||
// - If the engine is Distributed, it is replaced with a plain MergeTree so the
|
||||
// "distributed_*" table is a real, directly-writable table on the single chdb
|
||||
// node. This preserves the exact column list while dropping distribution.
|
||||
// - All other engines (MergeTree, ReplacingMergeTree, …) are used as-is.
|
||||
func adaptCreateTable(op schemamigrator.CreateTableOperation) string {
|
||||
if op.Engine.EngineType() == "Distributed" {
|
||||
op.Engine = schemamigrator.MergeTree{OrderBy: "tuple()"}
|
||||
}
|
||||
return op.ToSQL()
|
||||
}
|
||||
|
||||
// execSQL runs a single SQL statement against the session and returns any error.
|
||||
func execSQL(session *chdb.Session, sql string) error {
|
||||
result, err := session.Query(sql, "CSV")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer result.Free()
|
||||
return result.Error()
|
||||
}
|
||||
63
pkg/telemetrystore/chdbtelemetrystore/provider.go
Normal file
63
pkg/telemetrystore/chdbtelemetrystore/provider.go
Normal file
@@ -0,0 +1,63 @@
|
||||
//go:build chdb
|
||||
|
||||
package chdbtelemetrystore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
chdb "github.com/chdb-io/chdb-go/chdb"
|
||||
)
|
||||
|
||||
// Provider implements TelemetryStore using chdb-go for in-process ClickHouse execution.
|
||||
//
|
||||
// Unlike the mock-based provider (which uses go-sqlmock and requires pre-registered
|
||||
// expectations), this provider actually executes SQL against an embedded ClickHouse engine.
|
||||
// This makes it suitable for integration-style tests that need real query execution
|
||||
// without an external ClickHouse server.
|
||||
//
|
||||
// # Session lifecycle
|
||||
//
|
||||
// chdb-go maintains a package-level singleton session. Creating multiple Provider
|
||||
// instances in the same process shares the same underlying session, meaning DDL
|
||||
// (CREATE TABLE, DROP TABLE, INSERT) issued by one consumer is visible to others.
|
||||
// To maintain test isolation, use unique database or table names and call the cleanup
|
||||
// function returned by New via t.Cleanup.
|
||||
type Provider struct {
|
||||
conn *chdbConn
|
||||
cluster string
|
||||
}
|
||||
|
||||
var _ telemetrystore.TelemetryStore = (*Provider)(nil)
|
||||
|
||||
// New creates a Provider backed by an in-process chdb session and runs the full
|
||||
// signoz-otel-collector logs schema migrations so the tables are ready for use.
|
||||
// The returned cleanup function closes the session and should be wired in via t.Cleanup.
|
||||
func New() (*Provider, func(), error) {
|
||||
session, err := chdb.NewSession()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("chdbtelemetrystore: failed to create session: %w", err)
|
||||
}
|
||||
|
||||
if err := runMigrations(session); err != nil {
|
||||
session.Close()
|
||||
return nil, nil, fmt.Errorf("chdbtelemetrystore: schema migration failed: %w", err)
|
||||
}
|
||||
|
||||
cleanup := func() { session.Close() }
|
||||
return &Provider{
|
||||
conn: &chdbConn{session: session},
|
||||
cluster: "local",
|
||||
}, cleanup, nil
|
||||
}
|
||||
|
||||
// ClickhouseDB returns the chdb-backed clickhouse.Conn.
|
||||
func (p *Provider) ClickhouseDB() clickhouse.Conn {
|
||||
return p.conn
|
||||
}
|
||||
|
||||
// Cluster returns the cluster name for this provider.
|
||||
func (p *Provider) Cluster() string {
|
||||
return p.cluster
|
||||
}
|
||||
144
pkg/telemetrystore/chdbtelemetrystore/rows.go
Normal file
144
pkg/telemetrystore/chdbtelemetrystore/rows.go
Normal file
@@ -0,0 +1,144 @@
|
||||
//go:build chdb
|
||||
|
||||
package chdbtelemetrystore
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
chdbpurego "github.com/chdb-io/chdb-go/chdb-purego"
|
||||
)
|
||||
|
||||
// chdbRows implements clickhouse/v2/lib/driver.Rows over a parsed JSONCompact response.
|
||||
type chdbRows struct {
|
||||
meta []jsonMeta
|
||||
data [][]json.RawMessage
|
||||
cursor int
|
||||
result chdbpurego.ChdbResult // held so we can Free() on Close
|
||||
}
|
||||
|
||||
func newChdbRows(result chdbpurego.ChdbResult) (*chdbRows, error) {
|
||||
str := result.String()
|
||||
if strings.TrimSpace(str) == "" {
|
||||
return &chdbRows{result: result, cursor: -1}, nil
|
||||
}
|
||||
var jr jsonCompactResult
|
||||
if err := json.Unmarshal([]byte(str), &jr); err != nil {
|
||||
return nil, fmt.Errorf("chdbRows: parse response: %w", err)
|
||||
}
|
||||
return &chdbRows{
|
||||
meta: jr.Meta,
|
||||
data: jr.Data,
|
||||
cursor: -1,
|
||||
result: result,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *chdbRows) Next() bool {
|
||||
r.cursor++
|
||||
return r.cursor < len(r.data)
|
||||
}
|
||||
|
||||
// Scan copies the current row's columns into dest (positional pointer arguments).
|
||||
func (r *chdbRows) Scan(dest ...any) error {
|
||||
if r.cursor < 0 || r.cursor >= len(r.data) {
|
||||
return fmt.Errorf("chdbRows: Scan called outside a valid row")
|
||||
}
|
||||
row := r.data[r.cursor]
|
||||
for i, d := range dest {
|
||||
if i >= len(row) {
|
||||
break
|
||||
}
|
||||
dv := reflect.ValueOf(d)
|
||||
if dv.Kind() != reflect.Ptr {
|
||||
return fmt.Errorf("chdbRows: Scan dest[%d] must be a pointer", i)
|
||||
}
|
||||
if err := unmarshalIntoField(row[i], dv.Elem()); err != nil {
|
||||
return fmt.Errorf("chdbRows: Scan col %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ScanStruct fills a struct from the current row using the same tag-based field
|
||||
// matching as Select.
|
||||
func (r *chdbRows) ScanStruct(dest any) error {
|
||||
if r.cursor < 0 || r.cursor >= len(r.data) {
|
||||
return fmt.Errorf("chdbRows: ScanStruct called outside a valid row")
|
||||
}
|
||||
elem := reflect.ValueOf(dest)
|
||||
if elem.Kind() == reflect.Ptr {
|
||||
elem = elem.Elem()
|
||||
}
|
||||
return scanRowIntoValue(r.meta, r.data[r.cursor], elem)
|
||||
}
|
||||
|
||||
func (r *chdbRows) ColumnTypes() []driver.ColumnType {
|
||||
types := make([]driver.ColumnType, len(r.meta))
|
||||
for i, m := range r.meta {
|
||||
types[i] = &chdbColumnType{name: m.Name, dbType: m.Type}
|
||||
}
|
||||
return types
|
||||
}
|
||||
|
||||
func (r *chdbRows) Totals(_ ...any) error { return nil }
|
||||
|
||||
func (r *chdbRows) Columns() []string {
|
||||
cols := make([]string, len(r.meta))
|
||||
for i, m := range r.meta {
|
||||
cols[i] = m.Name
|
||||
}
|
||||
return cols
|
||||
}
|
||||
|
||||
func (r *chdbRows) Close() error {
|
||||
if r.result != nil {
|
||||
r.result.Free()
|
||||
r.result = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *chdbRows) Err() error { return nil }
|
||||
|
||||
// chdbRow wraps chdbRows and exposes the first row as clickhouse/v2/lib/driver.Row.
|
||||
type chdbRow struct {
|
||||
err error
|
||||
rows *chdbRows
|
||||
}
|
||||
|
||||
func (r *chdbRow) Err() error { return r.err }
|
||||
|
||||
func (r *chdbRow) Scan(dest ...any) error {
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
}
|
||||
if !r.rows.Next() {
|
||||
return fmt.Errorf("chdb: no rows in result set")
|
||||
}
|
||||
return r.rows.Scan(dest...)
|
||||
}
|
||||
|
||||
func (r *chdbRow) ScanStruct(dest any) error {
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
}
|
||||
if !r.rows.Next() {
|
||||
return fmt.Errorf("chdb: no rows in result set")
|
||||
}
|
||||
return r.rows.ScanStruct(dest)
|
||||
}
|
||||
|
||||
// chdbColumnType implements driver.ColumnType for chdb result metadata.
|
||||
type chdbColumnType struct {
|
||||
name string
|
||||
dbType string
|
||||
}
|
||||
|
||||
func (c *chdbColumnType) Name() string { return c.name }
|
||||
func (c *chdbColumnType) Nullable() bool { return strings.HasPrefix(c.dbType, "Nullable") }
|
||||
func (c *chdbColumnType) ScanType() reflect.Type { return reflect.TypeOf("") }
|
||||
func (c *chdbColumnType) DatabaseTypeName() string { return c.dbType }
|
||||
Reference in New Issue
Block a user