Compare commits

..

72 Commits

Author SHA1 Message Date
Piyush Singariya
19f4cde52e fix: package test fixed 2026-03-17 13:46:58 +05:30
srikanthccv
fcce8b4008 chore: text search tests 2026-03-17 09:08:02 +05:30
srikanthccv
c2c2678125 chore: keep message contained to field mapper 2026-03-17 08:55:37 +05:30
srikanthccv
73dc095b79 Merge branch 'main' into keep-message-contained 2026-03-17 05:11:24 +05:30
Piyush Singariya
b2adbb510b fix: remove the exception checking 2026-03-16 19:46:13 +05:30
Piyush Singariya
4f7ec4c057 revert: remove unused JSON Field datatype 2026-03-16 16:24:38 +05:30
Piyush Singariya
9c5a5488e2 fix: fallback expr switch case 2026-03-16 16:22:14 +05:30
Piyush Singariya
9ee23eae06 Merge branch 'main' into merge-json-col-fields 2026-03-13 23:43:27 +05:30
Piyush Singariya
0f36479787 Merge branch 'main' into merge-json-col-fields 2026-03-13 19:55:05 +05:30
Piyush Singariya
95a9a24875 fix: message field key search in JSON Logs (#10577)
* feat: work in progress

* fix: test run success

* fix: in progress

* fix: excluding message from metadata fetch

* test: cleared

* fix: key name in metadata

* fix: uncomment tests

* chore: change to method for staticfields

* fix: remove confusing comments; remove usage of logical keyword

* chore: shift method above business logic

* chore: changes based on review

* fix: comments in metadata_store.go
2026-03-13 19:16:05 +05:30
Piyush Singariya
7d1e39037c chore: minor changes based on review 2026-03-13 13:06:56 +05:30
Piyush Singariya
54f104db5f fix: cursor comments 2026-03-13 12:15:12 +05:30
Piyush Singariya
ab0852bbfb feat: update message as typehint in JSON Column (#10545) 2026-03-11 14:50:59 +05:30
Piyush Singariya
4c7aba680e fix: lint error 2026-03-10 14:00:12 +05:30
Piyush Singariya
23c247a1ba Merge branch 'main' into merge-json-col-fields 2026-03-10 13:28:30 +05:30
Piyush Singariya
4777b13ddf fix: shift warning attachment to getKeySelectors 2026-03-03 13:27:23 +05:30
Piyush Singariya
2d3060bac4 chore: remove unnecessary change 2026-02-25 12:09:30 +05:30
Piyush Singariya
9101d51920 Merge branch 'main' into merge-json-col-fields 2026-02-23 12:50:11 +05:30
Piyush Singariya
82b82b0208 chore: addressing comments from Nitya 2026-02-23 12:49:48 +05:30
Nityananda Gohain
51bd760d9a Merge branch 'main' into merge-json-col-fields 2026-02-20 15:53:31 +05:30
Piyush Singariya
2a492cc783 fix: change warning to a const to fix tests 2026-02-20 14:54:16 +05:30
Piyush Singariya
24afdad36c fix: append warnings from fieldkeys 2026-02-20 14:51:13 +05:30
Piyush Singariya
5d20019207 fix: body.message not being mapped correctly 2026-02-20 14:26:12 +05:30
Piyush Singariya
1963d5811d Merge branch 'main' into merge-json-col-fields 2026-02-20 10:08:25 +05:30
Piyush Singariya
15cfccad74 revert: change ReadMultiple is needed 2026-02-20 10:08:11 +05:30
Piyush Singariya
a0399560e3 revert: remvoing unused function 2026-02-19 16:33:12 +05:30
Piyush Singariya
265e337d5c Merge branch 'main' into merge-json-col-fields 2026-02-19 16:29:55 +05:30
Piyush Singariya
bb8c874755 fix: go lint 2026-02-17 17:19:24 +05:30
Piyush Singariya
13cbe03d64 fix: tests 2026-02-17 16:58:00 +05:30
Piyush Singariya
93621c29b7 fix: go mod changes 2026-02-17 16:29:00 +05:30
Piyush Singariya
2c691b5a75 fix: test fixed 2026-02-17 16:28:54 +05:30
Piyush Singariya
cd7e1bb114 Merge branch 'main' into merge-json-col-fields 2026-02-17 16:22:58 +05:30
Piyush Singariya
a1d2ec8b8a fix: remove unused function 2026-02-17 16:18:38 +05:30
Piyush Singariya
8bbafb52d5 fix: go.mod required changes 2026-02-17 16:16:17 +05:30
Piyush Singariya
075cfab463 feat: mapping body_v2.message:string map to body 2026-02-17 13:26:34 +05:30
Piyush Singariya
86bccaac0c test: blocked on pr #10153 2026-02-16 15:24:31 +05:30
Piyush Singariya
de1aac63c0 revert: more unrelated change 2026-02-16 13:19:49 +05:30
Piyush Singariya
14fe8745b5 Merge branch 'main' into merge-json-col-fields 2026-02-16 13:14:39 +05:30
Piyush Singariya
4013c7ee03 revert: few unrelated changes 2026-02-16 13:13:07 +05:30
Piyush Singariya
0d34360e0b fix: handle datatype collision 2026-01-30 12:17:28 +05:30
srikanthccv
d204c89dec Merge branch 'main' into merge-json-col-fields 2026-01-30 02:12:14 +05:30
Piyush Singariya
8dd33c1ab7 Merge branch 'main' into merge-json-col-fields 2026-01-29 19:57:22 +05:30
Piyush Singariya
8e5c3d5ae1 chore: merge json fields 2026-01-29 16:46:00 +05:30
Piyush Singariya
d45bb52f33 Merge branch 'has-jsonqb' into merge-json-col-fields 2026-01-29 13:21:13 +05:30
Piyush Singariya
e71818292d fix: go test flakiness 2026-01-29 10:17:53 +05:30
Piyush Singariya
37557f7f24 Merge branch 'main' into has-jsonqb 2026-01-29 09:12:22 +05:30
Piyush Singariya
27ff102660 Merge branch 'main' into has-jsonqb 2026-01-28 17:44:48 +05:30
Piyush Singariya
cb2aa4cffd fix: tests 2026-01-28 17:42:17 +05:30
Piyush Singariya
58d1d84ec7 test: fix 2026-01-28 15:34:22 +05:30
Piyush Singariya
d8e116a7bc fix: merge conflict 2026-01-28 15:15:50 +05:30
Piyush Singariya
6a48bdc37e Merge branch 'main' into has-jsonqb 2026-01-28 15:15:17 +05:30
Piyush Singariya
ffb62432f8 chore: var renamed 2026-01-28 14:42:51 +05:30
Piyush Singariya
57c51f070c fix: merge json body columns together 2026-01-28 14:36:15 +05:30
Piyush Singariya
36becfc7a2 fix: removed comment 2026-01-27 13:20:52 +05:30
Piyush Singariya
8e71de09f3 fix: remove unnecessary bool checking 2026-01-27 13:16:30 +05:30
Piyush Singariya
56de92de73 fix: changes based on review from Srikanth 2026-01-27 13:12:32 +05:30
Piyush Singariya
62b10f8e77 Merge branch 'main' into has-jsonqb 2026-01-27 10:04:32 +05:30
Piyush Singariya
20b53d7856 fix: review based on tushar 2026-01-27 10:04:15 +05:30
Piyush Singariya
8f2c506304 fix: json qb test fix 2026-01-22 22:00:06 +05:30
Srikanth Chekuri
7b5b9027dd Merge branch 'main' into has-jsonqb 2026-01-22 20:19:39 +05:30
Piyush Singariya
b77f97fcb7 fix: tests 2026-01-22 17:24:26 +05:30
Piyush Singariya
62942a4162 fix: tests 2026-01-22 15:47:20 +05:30
Piyush Singariya
349bbbbf1d Merge branch 'main' into has-jsonqb 2026-01-22 12:36:45 +05:30
Piyush Singariya
1966a7a5f6 fix: empty filteredArrays condition 2026-01-22 12:36:29 +05:30
Piyush Singariya
a4eed9ff13 fix: build json plans in metadata 2026-01-22 12:33:51 +05:30
Piyush Singariya
24d1ee33b5 revert: gitignore change 2026-01-22 10:39:02 +05:30
Srikanth Chekuri
3402203021 Merge branch 'main' into has-jsonqb 2026-01-21 13:47:39 +05:30
Piyush Singariya
e8e4897cc8 fix: tests GroupBy 2026-01-20 12:10:44 +05:30
Piyush Singariya
96fb88aaee fix: ignored .vscode in gitignore 2026-01-20 11:47:34 +05:30
Piyush Singariya
5a00e6c2cd Merge branch 'main' into has-jsonqb 2026-01-20 11:44:32 +05:30
Piyush Singariya
e2500cff7d fix: tests expected queries and values 2026-01-20 11:40:56 +05:30
Piyush Singariya
4864c3bc37 feat: has JSON QB 2026-01-20 11:23:50 +05:30
29 changed files with 569 additions and 1071 deletions

10
.github/CODEOWNERS vendored
View File

@@ -1,6 +1,8 @@
# CODEOWNERS info: https://help.github.com/en/articles/about-code-owners
# Owners are automatically requested for review for PRs that changes code that they own.
# Owners are automatically requested for review for PRs that changes code
# that they own.
/frontend/ @SigNoz/frontend-maintainers
@@ -9,10 +11,8 @@
/frontend/src/container/OnboardingV2Container/onboarding-configs/onboarding-config-with-links.json @makeavish
/frontend/src/container/OnboardingV2Container/AddDataSource/AddDataSource.tsx @makeavish
# CI
/deploy/ @therealpandey
.github @therealpandey
go.mod @therealpandey
/deploy/ @SigNoz/devops
.github @SigNoz/devops
# Scaffold Owners

View File

@@ -8,7 +8,6 @@ on:
jobs:
notify:
runs-on: ubuntu-latest
if: github.event.pull_request.merged == false
steps:
- name: alert
uses: slackapi/slack-github-action@v2.1.1

2
go.mod
View File

@@ -11,7 +11,6 @@ require (
github.com/SigNoz/signoz-otel-collector v0.144.2
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/bytedance/sonic v1.14.1
github.com/cespare/xxhash/v2 v2.3.0
github.com/coreos/go-oidc/v3 v3.17.0
github.com/dgraph-io/ristretto/v2 v2.3.0
@@ -106,6 +105,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
github.com/aws/smithy-go v1.24.0 // indirect
github.com/bytedance/gopkg v0.1.3 // indirect
github.com/bytedance/sonic v1.14.1 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect

View File

@@ -1,65 +0,0 @@
package cloudintegration
import (
"context"
"net/http"
citypes "github.com/SigNoz/signoz/pkg/types/cloudintegrationtypes"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
CreateAccount(ctx context.Context, account *citypes.Account) error
// GetAccount returns cloud integration account
GetAccount(ctx context.Context, orgID, accountID valuer.UUID) (*citypes.Account, error)
// GetAccounts lists accounts where agent is connected
GetAccounts(ctx context.Context, orgID valuer.UUID) ([]*citypes.Account, error)
// UpdateAccount updates the cloud integration account for a specific organization.
UpdateAccount(ctx context.Context, account *citypes.Account) error
// DisconnectAccount soft deletes/removes a cloud integration account.
DisconnectAccount(ctx context.Context, orgID, accountID valuer.UUID) error
// GetConnectionArtifact returns cloud provider specific connection information,
// client side handles how this information is shown
GetConnectionArtifact(ctx context.Context, account *citypes.Account, req *citypes.ConnectionArtifactRequest) (*citypes.ConnectionArtifact, error)
// GetServicesMetadata returns list of services metadata for a cloud provider attached with the integrationID.
// This just returns a summary of the service and not the whole service definition
GetServicesMetadata(ctx context.Context, orgID valuer.UUID, integrationID *valuer.UUID) ([]*citypes.ServiceMetadata, error)
// GetService returns service definition details for a serviceID. This returns config and
// other details required to show in service details page on web client.
GetService(ctx context.Context, orgID valuer.UUID, integrationID *valuer.UUID, serviceID string) (*citypes.Service, error)
// UpdateService updates cloud integration service
UpdateService(ctx context.Context, orgID valuer.UUID, service *citypes.CloudIntegrationService) error
// AgentCheckIn is called by agent to heartbeat and get latest config in response.
AgentCheckIn(ctx context.Context, orgID valuer.UUID, req *citypes.AgentCheckInRequest) (*citypes.AgentCheckInResponse, error)
// GetDashboardByID returns dashboard JSON for a given dashboard id.
// this only returns the dashboard when the service (embedded in dashboard id) is enabled
// in the org for any cloud integration account
GetDashboardByID(ctx context.Context, orgID valuer.UUID, id string) (*dashboardtypes.Dashboard, error)
// GetAllDashboards returns list of dashboards across all connected cloud integration accounts
// for enabled services in the org. This list gets added to dashboard list page
GetAllDashboards(ctx context.Context, orgID valuer.UUID) ([]*dashboardtypes.Dashboard, error)
}
type Handler interface {
GetConnectionArtifact(http.ResponseWriter, *http.Request)
GetAccounts(http.ResponseWriter, *http.Request)
GetAccount(http.ResponseWriter, *http.Request)
UpdateAccount(http.ResponseWriter, *http.Request)
DisconnectAccount(http.ResponseWriter, *http.Request)
GetServicesMetadata(http.ResponseWriter, *http.Request)
GetService(http.ResponseWriter, *http.Request)
UpdateService(http.ResponseWriter, *http.Request)
AgentCheckIn(http.ResponseWriter, *http.Request)
}

View File

@@ -78,7 +78,7 @@ func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetype
// add the paths that are not promoted but have indexes
for path, indexes := range aggr {
path := strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
path := strings.TrimPrefix(path, telemetrylogs.BodyV2ColumnPrefix)
path = telemetrytypes.BodyJSONStringSearchPrefix + path
response = append(response, promotetypes.PromotePath{
Path: path,
@@ -163,7 +163,7 @@ func (m *module) PromoteAndIndexPaths(
}
}
if len(it.Indexes) > 0 {
parentColumn := telemetrylogs.LogsV2BodyJSONColumn
parentColumn := telemetrylogs.LogsV2BodyV2Column
// if the path is already promoted or is being promoted, add it to the promoted column
if _, promoted := existingPromotedPaths[it.Path]; promoted || it.Promote {
parentColumn = telemetrylogs.LogsV2BodyPromotedColumn

View File

@@ -10,13 +10,11 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/bytedance/sonic"
)
type builderQuery[T any] struct {
@@ -262,40 +260,6 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string,
return nil, err
}
// merge body_json and promoted into body
if q.spec.Signal == telemetrytypes.SignalLogs {
switch typedPayload := payload.(type) {
case *qbtypes.RawData:
for _, rr := range typedPayload.Rows {
seeder := func() error {
body, ok := rr.Data[telemetrylogs.LogsV2BodyJSONColumn].(map[string]any)
if !ok {
return nil
}
promoted, ok := rr.Data[telemetrylogs.LogsV2BodyPromotedColumn].(map[string]any)
if !ok {
return nil
}
seed(promoted, body)
str, err := sonic.MarshalString(body)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal body")
}
rr.Data["body"] = str
return nil
}
err := seeder()
if err != nil {
return nil, err
}
delete(rr.Data, telemetrylogs.LogsV2BodyJSONColumn)
delete(rr.Data, telemetrylogs.LogsV2BodyPromotedColumn)
}
payload = typedPayload
}
}
return &qbtypes.Result{
Type: q.kind,
Value: payload,
@@ -423,18 +387,3 @@ func decodeCursor(cur string) (int64, error) {
}
return strconv.ParseInt(string(b), 10, 64)
}
func seed(promoted map[string]any, body map[string]any) {
for key, fromValue := range promoted {
if toValue, ok := body[key]; !ok {
body[key] = fromValue
} else {
if fromValue, ok := fromValue.(map[string]any); ok {
if toValue, ok := toValue.(map[string]any); ok {
seed(fromValue, toValue)
body[key] = toValue
}
}
}
}
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/bytedance/sonic"
)
var (
@@ -394,17 +393,11 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
// de-reference the typed pointer to any
val := reflect.ValueOf(cellPtr).Elem().Interface()
// Post-process JSON columns: normalize into structured values
// Post-process JSON columns: normalize into String value
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
switch x := val.(type) {
case []byte:
if len(x) > 0 {
var v any
if err := sonic.Unmarshal(x, &v); err == nil {
val = v
}
}
val = string(x)
default:
// already a structured type (map[string]any, []any, etc.)
}

View File

@@ -219,7 +219,6 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
// we don't have a toBoolOrNull in ClickHouse, so we need to convert the bool to a string
value = fmt.Sprintf("%t", v)
}
case telemetrytypes.FieldDataTypeInt64,
telemetrytypes.FieldDataTypeArrayInt64,
telemetrytypes.FieldDataTypeNumber,

View File

@@ -313,37 +313,30 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
return ""
}
child := ctx.GetChild(0)
var searchText string
if keyCtx, ok := child.(*grammar.KeyContext); ok {
// create a full text search condition on the body field
keyText := keyCtx.GetText()
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(keyText), v.builder, v.startNs, v.endNs)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
searchText = keyCtx.GetText()
} else if valCtx, ok := child.(*grammar.ValueContext); ok {
var text string
if valCtx.QUOTED_TEXT() != nil {
text = trimQuotes(valCtx.QUOTED_TEXT().GetText())
searchText = trimQuotes(valCtx.QUOTED_TEXT().GetText())
} else if valCtx.NUMBER() != nil {
text = valCtx.NUMBER().GetText()
searchText = valCtx.NUMBER().GetText()
} else if valCtx.BOOL() != nil {
text = valCtx.BOOL().GetText()
searchText = valCtx.BOOL().GetText()
} else if valCtx.KEY() != nil {
text = valCtx.KEY().GetText()
searchText = valCtx.KEY().GetText()
} else {
v.errors = append(v.errors, fmt.Sprintf("unsupported value type: %s", valCtx.GetText()))
return ""
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(text), v.builder, v.startNs, v.endNs)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(searchText), v.builder, v.startNs, v.endNs)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
}
return "" // Should not happen with valid input
@@ -383,6 +376,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
for _, key := range keys {
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, nil, v.builder, v.startNs, v.endNs)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to build condition: %s", err.Error()))
return ""
}
conds = append(conds, condition)
@@ -648,7 +642,6 @@ func (v *filterExpressionVisitor) VisitValueList(ctx *grammar.ValueListContext)
// VisitFullText handles standalone quoted strings for full-text search
func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) any {
if v.skipFullTextFilter {
return ""
}
@@ -670,6 +663,7 @@ func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) an
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
}

View File

@@ -3,14 +3,12 @@ package telemetrylogs
import (
"context"
"fmt"
"slices"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"golang.org/x/exp/maps"
"github.com/huandu/go-sqlbuilder"
)
@@ -35,7 +33,7 @@ func (c *conditionBuilder) conditionFor(
return "", err
}
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled {
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && key.Name != messageSubField {
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
if err != nil {
@@ -54,14 +52,14 @@ func (c *conditionBuilder) conditionFor(
}
// Check if this is a body JSON search - either by FieldContext
if key.FieldContext == telemetrytypes.FieldContextBody {
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
tblFieldName, value = GetBodyJSONKey(ctx, key, operator, value)
}
tblFieldName, value = querybuilder.DataTypeCollisionHandledFieldName(key, value, tblFieldName, operator)
// make use of case insensitive index for body
if tblFieldName == "body" {
if tblFieldName == "body" || tblFieldName == messageSubColumn {
switch operator {
case qbtypes.FilterOperatorLike:
return sb.ILike(tblFieldName, value), nil
@@ -108,7 +106,6 @@ func (c *conditionBuilder) conditionFor(
return sb.ILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorRegexp:
// Note: Escape $$ to $$$$ to avoid sqlbuilder interpreting materialized $ signs
// Only needed because we are using sprintf instead of sb.Match (not implemented in sqlbuilder)
@@ -178,9 +175,8 @@ func (c *conditionBuilder) conditionFor(
case schema.ColumnTypeEnumJSON:
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(tblFieldName), nil
} else {
return sb.IsNull(tblFieldName), nil
}
return sb.IsNull(tblFieldName), nil
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
@@ -247,19 +243,30 @@ func (c *conditionBuilder) ConditionFor(
return "", err
}
if !(key.FieldContext == telemetrytypes.FieldContextBody && querybuilder.BodyJSONQueryEnabled) && operator.AddDefaultExistsFilter() {
// skip adding exists filter for intrinsic fields
// with an exception for body json search
field, _ := c.fm.FieldFor(ctx, key)
if slices.Contains(maps.Keys(IntrinsicFields), field) && key.FieldContext != telemetrytypes.FieldContextBody {
// Skip adding exists filter for intrinsic fields i.e. Table level log context fields
buildExistCondition := operator.AddDefaultExistsFilter()
switch key.FieldContext {
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextScope:
// pass; No need to build exist condition for top level columns
// immidiately return
return condition, nil
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute:
// build exist condition for resource and attribute fields based on filter operator
case telemetrytypes.FieldContextBody:
// Querying JSON fields already account for Nullability of fields
// so additional exists checks are not needed
if querybuilder.BodyJSONQueryEnabled {
return condition, nil
}
}
if buildExistCondition {
existsCondition, err := c.conditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb)
if err != nil {
return "", err
}
return sb.And(condition, existsCondition), nil
}
return condition, nil
}

View File

@@ -127,7 +127,8 @@ func TestConditionFor(t *testing.T) {
{
name: "Contains operator - body",
key: telemetrytypes.TelemetryFieldKey{
Name: "body",
Name: "body",
FieldContext: telemetrytypes.FieldContextLog,
},
operator: qbtypes.FilterOperatorContains,
value: 521509198310,

View File

@@ -1,7 +1,10 @@
package telemetrylogs
import (
"fmt"
"github.com/SigNoz/signoz-otel-collector/constants"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -17,7 +20,7 @@ const (
LogsV2TimestampColumn = "timestamp"
LogsV2ObservedTimestampColumn = "observed_timestamp"
LogsV2BodyColumn = "body"
LogsV2BodyJSONColumn = constants.BodyV2Column
LogsV2BodyV2Column = constants.BodyV2Column
LogsV2BodyPromotedColumn = constants.BodyPromotedColumn
LogsV2TraceIDColumn = "trace_id"
LogsV2SpanIDColumn = "span_id"
@@ -34,8 +37,14 @@ const (
LogsV2ResourcesStringColumn = "resources_string"
LogsV2ScopeStringColumn = "scope_string"
BodyJSONColumnPrefix = constants.BodyV2ColumnPrefix
BodyV2ColumnPrefix = constants.BodyV2ColumnPrefix
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
// messageSubColumn is the ClickHouse sub-column that body searches map to
// when BodyJSONQueryEnabled is true.
messageSubField = "message"
messageSubColumn = "body_v2.message"
bodySearchDefaultWarning = "body searches default to `body.message:string`. Use `body.<key>` to search a different field inside body"
)
var (
@@ -118,3 +127,11 @@ var (
},
}
)
func bodyAliasExpression() string {
if !querybuilder.BodyJSONQueryEnabled {
return LogsV2BodyColumn
}
return fmt.Sprintf("%s as body", LogsV2BodyV2Column)
}

View File

@@ -30,7 +30,8 @@ var (
"severity_text": {Name: "severity_text", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"severity_number": {Name: "severity_number", Type: schema.ColumnTypeUInt8},
"body": {Name: "body", Type: schema.ColumnTypeString},
LogsV2BodyJSONColumn: {Name: LogsV2BodyJSONColumn, Type: schema.JSONColumnType{
messageSubColumn: {Name: messageSubColumn, Type: schema.ColumnTypeString},
LogsV2BodyV2Column: {Name: LogsV2BodyV2Column, Type: schema.JSONColumnType{
MaxDynamicTypes: utils.ToPointer(uint(32)),
MaxDynamicPaths: utils.ToPointer(uint(0)),
}},
@@ -88,21 +89,26 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
return logsV2Columns["attributes_bool"], nil
}
case telemetrytypes.FieldContextBody:
// Body context is for JSON body fields
// Use body_json if feature flag is enabled
// Body context is for JSON body fields. Use body_v2 if feature flag is enabled.
if querybuilder.BodyJSONQueryEnabled {
return logsV2Columns[LogsV2BodyJSONColumn], nil
if key.Name == messageSubField {
return logsV2Columns[messageSubColumn], nil
}
return logsV2Columns[LogsV2BodyV2Column], nil
}
// Fall back to legacy body column
return logsV2Columns["body"], nil
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
if key.Name == LogsV2BodyColumn && querybuilder.BodyJSONQueryEnabled {
return logsV2Columns[messageSubColumn], nil
}
col, ok := logsV2Columns[key.Name]
if !ok {
// check if the key has body JSON search
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
// Use body_json if feature flag is enabled and we have a body condition builder
// Use body_v2 if feature flag is enabled and we have a body condition builder
if querybuilder.BodyJSONQueryEnabled {
return logsV2Columns[LogsV2BodyJSONColumn], nil
return logsV2Columns[LogsV2BodyV2Column], nil
}
// Fall back to legacy body column
return logsV2Columns["body"], nil
@@ -138,6 +144,10 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
}
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
case telemetrytypes.FieldContextBody:
if key.Name == messageSubField {
return messageSubColumn, nil
}
if key.JSONDataType == nil {
return "", qbtypes.ErrColumnNotFound
}
@@ -246,34 +256,37 @@ func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (
node := plan[0]
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
if key.Materialized {
if len(plan) < 2 {
return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
"plan length is less than 2 for promoted path: %s", key.Name)
}
// TODO(Piyush): Promoted path logic commented out. Materialized now means type hint
// promotion will be extracted from key field evolution
// (direct sub-column access), not a promoted body_promoted.* column.
// if key.Materialized {
// if len(plan) < 2 {
// return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
// "plan length is less than 2 for promoted path: %s", key.Name)
// }
node := plan[1]
promotedExpr := fmt.Sprintf(
"dynamicElement(%s, '%s')",
node.FieldPath(),
node.TerminalConfig.ElemType.StringValue(),
)
// node := plan[1]
// promotedExpr := fmt.Sprintf(
// "dynamicElement(%s, '%s')",
// node.FieldPath(),
// node.TerminalConfig.ElemType.StringValue(),
// )
// dynamicElement returns NULL for scalar types or an empty array for array types.
if node.TerminalConfig.ElemType.IsArray {
expr = fmt.Sprintf(
"if(length(%s) > 0, %s, %s)",
promotedExpr,
promotedExpr,
expr,
)
} else {
// promoted column first then body_json column
// TODO(Piyush): Change this in future for better performance
expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
}
// // dynamicElement returns NULL for scalar types or an empty array for array types.
// if node.TerminalConfig.ElemType.IsArray {
// expr = fmt.Sprintf(
// "if(length(%s) > 0, %s, %s)",
// promotedExpr,
// promotedExpr,
// expr,
// )
// } else {
// // promoted column first then body_json column
// // TODO(Piyush): Change this in future for better performance
// expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
// }
}
// }
return expr, nil
}

View File

@@ -30,7 +30,7 @@ func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType te
return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType]}
}
// BuildCondition builds the full WHERE condition for body_json JSON paths
// BuildCondition builds the full WHERE condition for body_v2 JSON paths
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
conditions := []string{}
for _, node := range c.key.JSONPlan {
@@ -40,6 +40,7 @@ func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperato
}
conditions = append(conditions, condition)
}
return sb.Or(conditions...), nil
}
@@ -288,9 +289,9 @@ func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, field
}
return sb.NotIn(fieldExpr, values...), nil
case qbtypes.FilterOperatorExists:
return fmt.Sprintf("%s IS NOT NULL", fieldExpr), nil
return sb.IsNotNull(fieldExpr), nil
case qbtypes.FilterOperatorNotExists:
return fmt.Sprintf("%s IS NULL", fieldExpr), nil
return sb.IsNull(fieldExpr), nil
// between and not between
case qbtypes.FilterOperatorBetween, qbtypes.FilterOperatorNotBetween:
values, ok := value.([]any)

File diff suppressed because one or more lines are too long

View File

@@ -65,7 +65,7 @@ func (b *logQueryStatementBuilder) Build(
start = querybuilder.ToNanoSecs(start)
end = querybuilder.ToNanoSecs(end)
keySelectors := getKeySelectors(query)
keySelectors, warnings := getKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
@@ -76,20 +76,29 @@ func (b *logQueryStatementBuilder) Build(
// Create SQL builder
q := sqlbuilder.NewSelectBuilder()
var stmt *qbtypes.Statement
switch requestType {
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
return b.buildListQuery(ctx, q, query, start, end, keys, variables)
stmt, err = b.buildListQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeTimeSeries:
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
stmt, err = b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeScalar:
return b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
stmt, err = b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
}
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
if err != nil {
return nil, err
}
stmt.Warnings = append(stmt.Warnings, warnings...)
return stmt, nil
}
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([]*telemetrytypes.FieldKeySelector, []string) {
var keySelectors []*telemetrytypes.FieldKeySelector
var warnings []string
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
@@ -136,7 +145,19 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
}
return keySelectors
// When the new JSON body experience is enabled, warn the user if they use the bare
// "body" key in the filter — queries on plain "body" default to body.message:string.
// TODO(Piyush): Setup better for coming FTS support.
if querybuilder.BodyJSONQueryEnabled {
for _, sel := range keySelectors {
if sel.Name == LogsV2BodyColumn {
warnings = append(warnings, bodySearchDefaultWarning)
break
}
}
}
return keySelectors, warnings
}
func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
@@ -203,7 +224,6 @@ func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[stri
}
func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
// First check if it matches with any intrinsic fields
var intrinsicOrCalculatedField telemetrytypes.TelemetryFieldKey
if _, ok := IntrinsicFields[key.Name]; ok {
@@ -212,7 +232,6 @@ func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldK
}
return querybuilder.AdjustKey(key, keys, nil)
}
// buildListQuery builds a query for list panel type
@@ -249,11 +268,7 @@ func (b *logQueryStatementBuilder) buildListQuery(
sb.SelectMore(LogsV2SeverityNumberColumn)
sb.SelectMore(LogsV2ScopeNameColumn)
sb.SelectMore(LogsV2ScopeVersionColumn)
sb.SelectMore(LogsV2BodyColumn)
if querybuilder.BodyJSONQueryEnabled {
sb.SelectMore(LogsV2BodyJSONColumn)
sb.SelectMore(LogsV2BodyPromotedColumn)
}
sb.SelectMore(bodyAliasExpression())
sb.SelectMore(LogsV2AttributesStringColumn)
sb.SelectMore(LogsV2AttributesNumberColumn)
sb.SelectMore(LogsV2AttributesBoolColumn)

View File

@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
@@ -886,3 +887,246 @@ func TestAdjustKey(t *testing.T) {
})
}
}
func TestStmtBuilderBodyField(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
enableBodyJSONQuery bool
expected qbtypes.Statement
expectedErr error
}{
{
name: "body_exists",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body Exists"},
Limit: 10,
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},
expectedErr: nil,
},
{
name: "body_exists_disabled",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body Exists"},
Limit: 10,
},
enableBodyJSONQuery: false,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "body_empty",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body == ''"},
Limit: 10,
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},
expectedErr: nil,
},
{
name: "body_empty_disabled",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body == ''"},
Limit: 10,
},
enableBodyJSONQuery: false,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "body_contains",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body CONTAINS 'error'"},
Limit: 10,
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body_v2.message) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "%error%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},
expectedErr: nil,
},
{
name: "body_contains_disabled",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "body CONTAINS 'error'"},
Limit: 10,
},
enableBodyJSONQuery: false,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "%error%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
enable, disable := jsonQueryTestUtil(t)
defer disable()
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if c.enableBodyJSONQuery {
enable()
} else {
disable()
}
// build the key map after enabling/disabling body JSON query
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
for _, field := range IntrinsicFields {
f := field
mockMetadataStore.KeysMap[field.Name] = append(mockMetadataStore.KeysMap[field.Name], &f)
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
)
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
if err != nil {
_, _, _, _, _, add := errors.Unwrapb(err)
t.Logf("error additionals: %v", add)
}
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}
func TestStmtBuilderBodyFullTextSearch(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
enableBodyJSONQuery bool
expected qbtypes.Statement
expectedErr error
}{
{
name: "body_contains",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "'error'"},
Limit: 10,
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND match(LOWER(body_v2.message), LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "error", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "body_contains_disabled",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "'error'"},
Limit: 10,
},
enableBodyJSONQuery: false,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND match(LOWER(body), LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "error", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
enable, disable := jsonQueryTestUtil(t)
defer disable()
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if c.enableBodyJSONQuery {
enable()
} else {
disable()
}
// build the key map after enabling/disabling body JSON query
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
for _, field := range IntrinsicFields {
f := field
mockMetadataStore.KeysMap[field.Name] = append(mockMetadataStore.KeysMap[field.Name], &f)
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
)
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
if err != nil {
_, _, _, _, _, add := errors.Unwrapb(err)
t.Logf("error additionals: %v", add)
}
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}

View File

@@ -27,13 +27,6 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"body": {
{
Name: "body",
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"http.status_code": {
{
Name: "http.status_code",
@@ -938,6 +931,13 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
Materialized: true,
},
},
"body": {
{
Name: "body",
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
}
for _, keys := range keysMap {
@@ -945,6 +945,7 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
key.Signal = telemetrytypes.SignalLogs
}
}
return keysMap
}

View File

@@ -54,6 +54,7 @@ func (t *telemetryMetaStore) fetchBodyJSONPaths(ctx context.Context,
instrumentationtypes.CodeNamespace: "metadata",
instrumentationtypes.CodeFunctionName: "fetchBodyJSONPaths",
})
query, args, limit := buildGetBodyJSONPathsQuery(fieldKeySelectors)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
@@ -184,7 +185,6 @@ func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySele
limit += fieldKeySelector.Limit
}
sb.Where(sb.Or(orClauses...))
// Group by path to get unique paths with aggregated types
sb.GroupBy("path")
@@ -319,7 +319,7 @@ func (t *telemetryMetaStore) ListJSONValues(ctx context.Context, path string, li
if promoted {
path = telemetrylogs.BodyPromotedColumnPrefix + path
} else {
path = telemetrylogs.BodyJSONColumnPrefix + path
path = telemetrylogs.BodyV2ColumnPrefix + path
}
from := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
@@ -522,7 +522,7 @@ func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...stri
// TODO(Piyush): Remove this function
func CleanPathPrefixes(path string) string {
path = strings.TrimPrefix(path, telemetrytypes.BodyJSONStringSearchPrefix)
path = strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
path = strings.TrimPrefix(path, telemetrylogs.BodyV2ColumnPrefix)
path = strings.TrimPrefix(path, telemetrylogs.BodyPromotedColumnPrefix)
return path
}

View File

@@ -102,7 +102,7 @@ func NewTelemetryMetaStore(
jsonColumnMetadata: map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata{
telemetrytypes.SignalLogs: {
telemetrytypes.FieldContextBody: telemetrytypes.JSONColumnMetadata{
BaseColumn: telemetrylogs.LogsV2BodyJSONColumn,
BaseColumn: telemetrylogs.LogsV2BodyV2Column,
PromotedColumn: telemetrylogs.LogsV2BodyPromotedColumn,
},
},

View File

@@ -1,45 +0,0 @@
package cloudintegrationtypes
import (
"time"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
type (
Account struct {
types.Identifiable
types.TimeAuditable
ProviderAccountId *string `json:"providerAccountID,omitempty"`
Provider CloudProviderType `json:"provider"`
RemovedAt *time.Time `json:"removedAt,omitempty"`
AgentReport *AgentReport `json:"agentReport,omitempty"`
OrgID valuer.UUID `json:"orgID"`
Config *AccountConfig `json:"config,omitempty"`
}
GettableConnectedAccounts struct {
Accounts []*Account `json:"accounts"`
}
GettableAccount = Account
UpdatableAccount struct {
Config *AccountConfig `json:"config"`
}
)
// AgentReport represents heartbeats sent by the agent.
type AgentReport struct {
TimestampMillis int64 `json:"timestampMillis"`
Data map[string]any `json:"data"`
}
type AccountConfig struct {
AWS *AWSAccountConfig `json:"aws,omitempty"`
}
type AWSAccountConfig struct {
Regions []string `json:"regions"`
}

View File

@@ -1,83 +0,0 @@
package cloudintegrationtypes
import (
"database/sql/driver"
"encoding/json"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/uptrace/bun"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
ErrCodeCloudIntegrationNotFound = errors.MustNewCode("cloud_integration_not_found")
)
// StorableCloudIntegration represents a cloud integration stored in the database.
// This is also referred as "Account" in the context of cloud integrations.
type StorableCloudIntegration struct {
bun.BaseModel `bun:"table:cloud_integration"`
types.Identifiable
types.TimeAuditable
Provider CloudProviderType `json:"provider" bun:"provider,type:text"`
// Config is provider specific data in JSON string format
Config string `json:"config" bun:"config,type:text"`
AccountID *string `json:"accountID" bun:"account_id,type:text"`
LastAgentReport *StorableAgentReport `json:"lastAgentReport" bun:"last_agent_report,type:text"`
RemovedAt *time.Time `json:"removedAt" bun:"removed_at,type:timestamp,nullzero"`
OrgID valuer.UUID `json:"orgID" bun:"org_id,type:text"`
}
// StorableAgentReport represents the last heartbeat and arbitrary data sent by the agent
// as of now there is no use case for Data field, but keeping it for backwards compatibility with older structure.
type StorableAgentReport struct {
TimestampMillis int64 `json:"timestamp_millis"`
Data map[string]any `json:"data"`
}
// StorableCloudIntegrationService is to store service config for a cloud integration, which is a cloud provider specific configuration.
type StorableCloudIntegrationService struct {
bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"`
types.Identifiable
types.TimeAuditable
// Keeping Type field name as is, but it is a service id
Type ServiceID `bun:"type,type:text,notnull,unique:cloud_integration_id_type"`
// Config is cloud provider's service specific data in JSON string format
Config string `bun:"config,type:text"`
CloudIntegrationID valuer.UUID `bun:"cloud_integration_id,type:text,notnull,unique:cloud_integration_id_type,on_delete:cascade"`
}
// Scan scans value from DB.
func (r *StorableAgentReport) Scan(src any) error {
var data []byte
switch v := src.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
return json.Unmarshal(data, r)
}
// Value creates value to be stored in DB.
func (r *StorableAgentReport) Value() (driver.Value, error) {
if r == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "agent report is nil")
}
serialized, err := json.Marshal(r)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize agent report to JSON",
)
}
// Return as string instead of []byte to ensure PostgreSQL stores as text, not bytes
return string(serialized), nil
}

View File

@@ -1,41 +0,0 @@
package cloudintegrationtypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
// CloudProviderType type alias.
type CloudProviderType struct{ valuer.String }
var (
// cloud providers.
CloudProviderTypeAWS = CloudProviderType{valuer.NewString("aws")}
CloudProviderTypeAzure = CloudProviderType{valuer.NewString("azure")}
// errors.
ErrCodeCloudProviderInvalidInput = errors.MustNewCode("invalid_cloud_provider")
AWSIntegrationUserEmail = valuer.MustNewEmail("aws-integration@signoz.io")
AzureIntegrationUserEmail = valuer.MustNewEmail("azure-integration@signoz.io")
)
// CloudIntegrationUserEmails is the list of valid emails for Cloud One Click integrations.
// This is used for validation and restrictions in different contexts, across codebase.
var CloudIntegrationUserEmails = []valuer.Email{
AWSIntegrationUserEmail,
AzureIntegrationUserEmail,
}
// NewCloudProvider returns a new CloudProviderType from a string.
// It validates the input and returns an error if the input is not valid cloud provider.
func NewCloudProvider(provider string) (CloudProviderType, error) {
switch provider {
case CloudProviderTypeAWS.StringValue():
return CloudProviderTypeAWS, nil
case CloudProviderTypeAzure.StringValue():
return CloudProviderTypeAzure, nil
default:
return CloudProviderType{}, errors.NewInvalidInputf(ErrCodeCloudProviderInvalidInput, "invalid cloud provider: %s", provider)
}
}

View File

@@ -1,97 +0,0 @@
package cloudintegrationtypes
import "github.com/SigNoz/signoz/pkg/types/integrationtypes"
// request for creating connection artifact.
type (
PostableConnectionArtifact = ConnectionArtifactRequest
ConnectionArtifactRequest struct {
Aws *AWSConnectionArtifactRequest `json:"aws"`
}
AWSConnectionArtifactRequest struct {
DeploymentRegion string `json:"deploymentRegion"`
Regions []string `json:"regions"`
}
)
type (
ConnectionArtifact struct {
Aws *AWSConnectionArtifact `json:"aws"`
}
AWSConnectionArtifact struct {
ConnectionUrl string `json:"connectionURL"`
}
GettableConnectionArtifact = ConnectionArtifact
)
type (
AccountStatus struct {
Id string `json:"id"`
ProviderAccountId *string `json:"providerAccountID,omitempty"`
Status integrationtypes.AccountStatus `json:"status"`
}
GettableAccountStatus = AccountStatus
)
type (
AgentCheckInRequest struct {
// older backward compatible fields are mapped to new fields
// CloudIntegrationId string `json:"cloudIntegrationId"`
// AccountId string `json:"accountId"`
// New fields
ProviderAccountId string `json:"providerAccountId"`
CloudAccountId string `json:"cloudAccountId"`
Data map[string]any `json:"data,omitempty"`
}
PostableAgentCheckInRequest struct {
AgentCheckInRequest
// following are backward compatible fields for older running agents
// which gets mapped to new fields in AgentCheckInRequest
CloudIntegrationId string `json:"cloud_integration_id"`
CloudAccountId string `json:"cloud_account_id"`
}
GettableAgentCheckInResponse struct {
AgentCheckInResponse
// For backward compatibility
CloudIntegrationId string `json:"cloud_integration_id"`
AccountId string `json:"account_id"`
}
AgentCheckInResponse struct {
// Older fields for backward compatibility are mapped to new fields below
// CloudIntegrationId string `json:"cloud_integration_id"`
// AccountId string `json:"account_id"`
// New fields
ProviderAccountId string `json:"providerAccountId"`
CloudAccountId string `json:"cloudAccountId"`
// IntegrationConfig populates data related to integration that is required for an agent
// to start collecting telemetry data
// keeping JSON key snake_case for backward compatibility
IntegrationConfig *IntegrationConfig `json:"integration_config,omitempty"`
}
IntegrationConfig struct {
EnabledRegions []string `json:"enabledRegions"` // backward compatible
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"` // backward compatible
// new fields
AWS *AWSIntegrationConfig `json:"aws,omitempty"`
}
AWSIntegrationConfig struct {
EnabledRegions []string `json:"enabledRegions"`
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"`
}
)

View File

@@ -1,103 +0,0 @@
package cloudintegrationtypes
import (
"github.com/SigNoz/signoz/pkg/errors"
)
var (
ErrCodeInvalidCloudRegion = errors.MustNewCode("invalid_cloud_region")
ErrCodeMismatchCloudProvider = errors.MustNewCode("cloud_provider_mismatch")
)
// List of all valid cloud regions on Amazon Web Services.
var ValidAWSRegions = map[string]struct{}{
"af-south-1": {}, // Africa (Cape Town).
"ap-east-1": {}, // Asia Pacific (Hong Kong).
"ap-northeast-1": {}, // Asia Pacific (Tokyo).
"ap-northeast-2": {}, // Asia Pacific (Seoul).
"ap-northeast-3": {}, // Asia Pacific (Osaka).
"ap-south-1": {}, // Asia Pacific (Mumbai).
"ap-south-2": {}, // Asia Pacific (Hyderabad).
"ap-southeast-1": {}, // Asia Pacific (Singapore).
"ap-southeast-2": {}, // Asia Pacific (Sydney).
"ap-southeast-3": {}, // Asia Pacific (Jakarta).
"ap-southeast-4": {}, // Asia Pacific (Melbourne).
"ca-central-1": {}, // Canada (Central).
"ca-west-1": {}, // Canada West (Calgary).
"eu-central-1": {}, // Europe (Frankfurt).
"eu-central-2": {}, // Europe (Zurich).
"eu-north-1": {}, // Europe (Stockholm).
"eu-south-1": {}, // Europe (Milan).
"eu-south-2": {}, // Europe (Spain).
"eu-west-1": {}, // Europe (Ireland).
"eu-west-2": {}, // Europe (London).
"eu-west-3": {}, // Europe (Paris).
"il-central-1": {}, // Israel (Tel Aviv).
"me-central-1": {}, // Middle East (UAE).
"me-south-1": {}, // Middle East (Bahrain).
"sa-east-1": {}, // South America (Sao Paulo).
"us-east-1": {}, // US East (N. Virginia).
"us-east-2": {}, // US East (Ohio).
"us-west-1": {}, // US West (N. California).
"us-west-2": {}, // US West (Oregon).
}
// List of all valid cloud regions for Microsoft Azure.
var ValidAzureRegions = map[string]struct{}{
"australiacentral": {}, // Australia Central
"australiacentral2": {}, // Australia Central 2
"australiaeast": {}, // Australia East
"australiasoutheast": {}, // Australia Southeast
"austriaeast": {}, // Austria East
"belgiumcentral": {}, // Belgium Central
"brazilsouth": {}, // Brazil South
"brazilsoutheast": {}, // Brazil Southeast
"canadacentral": {}, // Canada Central
"canadaeast": {}, // Canada East
"centralindia": {}, // Central India
"centralus": {}, // Central US
"chilecentral": {}, // Chile Central
"denmarkeast": {}, // Denmark East
"eastasia": {}, // East Asia
"eastus": {}, // East US
"eastus2": {}, // East US 2
"francecentral": {}, // France Central
"francesouth": {}, // France South
"germanynorth": {}, // Germany North
"germanywestcentral": {}, // Germany West Central
"indonesiacentral": {}, // Indonesia Central
"israelcentral": {}, // Israel Central
"italynorth": {}, // Italy North
"japaneast": {}, // Japan East
"japanwest": {}, // Japan West
"koreacentral": {}, // Korea Central
"koreasouth": {}, // Korea South
"malaysiawest": {}, // Malaysia West
"mexicocentral": {}, // Mexico Central
"newzealandnorth": {}, // New Zealand North
"northcentralus": {}, // North Central US
"northeurope": {}, // North Europe
"norwayeast": {}, // Norway East
"norwaywest": {}, // Norway West
"polandcentral": {}, // Poland Central
"qatarcentral": {}, // Qatar Central
"southafricanorth": {}, // South Africa North
"southafricawest": {}, // South Africa West
"southcentralus": {}, // South Central US
"southindia": {}, // South India
"southeastasia": {}, // Southeast Asia
"spaincentral": {}, // Spain Central
"swedencentral": {}, // Sweden Central
"switzerlandnorth": {}, // Switzerland North
"switzerlandwest": {}, // Switzerland West
"uaecentral": {}, // UAE Central
"uaenorth": {}, // UAE North
"uksouth": {}, // UK South
"ukwest": {}, // UK West
"westcentralus": {}, // West Central US
"westeurope": {}, // West Europe
"westindia": {}, // West India
"westus": {}, // West US
"westus2": {}, // West US 2
"westus3": {}, // West US 3
}

View File

@@ -1,250 +0,0 @@
package cloudintegrationtypes
import (
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
S3Sync = valuer.NewString("s3sync")
// ErrCodeInvalidServiceID is the error code for invalid service id.
ErrCodeInvalidServiceID = errors.MustNewCode("invalid_service_id")
)
type (
ServiceID struct{ valuer.String }
CloudIntegrationService struct {
types.Identifiable
types.TimeAuditable
Type ServiceID `json:"type"`
Config *ServiceConfig `json:"config"`
CloudIntegrationID valuer.UUID `json:"cloudIntegrationID"`
}
// ServiceMetadata helps to quickly list available services and whether it is enabled or not.
// As getting complete service definition is a heavy operation and the response is also large,
// initial integration page load can be very slow.
ServiceMetadata struct {
ServiceDefinitionMetadata
// if the service is enabled for the account
Enabled bool `json:"enabled"`
}
GettableServicesMetadata struct {
Services []*ServiceMetadata `json:"services"`
}
Service struct {
ServiceDefinition
ServiceConfig *ServiceConfig `json:"serviceConfig"`
}
GettableService = Service
UpdatableService struct {
Config *ServiceConfig `json:"config"`
}
)
type ServiceConfig struct {
AWS *AWSServiceConfig `json:"aws,omitempty"`
}
type AWSServiceConfig struct {
Logs *AWSServiceLogsConfig `json:"logs"`
Metrics *AWSServiceMetricsConfig `json:"metrics"`
}
// AWSServiceLogsConfig is AWS specific logs config for a service
// NOTE: the JSON keys are snake case for backward compatibility with existing agents.
type AWSServiceLogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type AWSServiceMetricsConfig struct {
Enabled bool `json:"enabled"`
}
// ServiceDefinitionMetadata represents service definition metadata. This is useful for showing service tab in frontend.
type ServiceDefinitionMetadata struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
}
type ServiceDefinition struct {
ServiceDefinitionMetadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"dataCollected"`
Strategy *CollectionStrategy `json:"telemetryCollectionStrategy"`
}
// CollectionStrategy is cloud provider specific configuration for signal collection,
// this is used by agent to understand the nitty-gritty for collecting telemetry for the cloud provider.
type CollectionStrategy struct {
AWS *AWSCollectionStrategy `json:"aws,omitempty"`
}
// Assets represents the collection of dashboards.
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
// SupportedSignals for cloud provider's service.
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
// DataCollected is curated static list of metrics and logs, this is shown as part of service overview.
type DataCollected struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
// CollectedLogAttribute represents a log attribute that is present in all log entries for a service,
// this is shown as part of service overview.
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
// CollectedMetric represents a metric that is collected for a service, this is shown as part of service overview.
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
// AWSCollectionStrategy represents signal collection strategy for AWS services.
// this is AWS specific.
// NOTE: this structure is still using snake case, for backward compatibility,
// with existing agents.
type AWSCollectionStrategy struct {
Metrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
Logs *AWSLogsStrategy `json:"aws_logs,omitempty"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type in AWS
}
// AWSMetricsStrategy represents metrics collection strategy for AWS services.
// this is AWS specific.
// NOTE: this structure is still using snake case, for backward compatibility,
// with existing agents.
type AWSMetricsStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
StreamFilters []struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
} `json:"cloudwatch_metric_stream_filters"`
}
// AWSLogsStrategy represents logs collection strategy for AWS services.
// this is AWS specific.
// NOTE: this structure is still using snake case, for backward compatibility,
// with existing agents.
type AWSLogsStrategy struct {
Subscriptions []struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
} `json:"cloudwatch_logs_subscriptions"`
}
// Dashboard represents a dashboard definition for cloud integration.
// This is used to show available pre-made dashboards for a service,
// hence has additional fields like id, title and description
type Dashboard struct {
Id string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
Definition dashboardtypes.StorableDashboardData `json:"definition,omitempty"`
}
// SupportedServices is the map of supported services for each cloud provider.
var SupportedServices = map[CloudProviderType][]ServiceID{
CloudProviderTypeAWS: {
{valuer.NewString("alb")},
{valuer.NewString("api-gateway")},
{valuer.NewString("dynamodb")},
{valuer.NewString("ec2")},
{valuer.NewString("ecs")},
{valuer.NewString("eks")},
{valuer.NewString("elasticache")},
{valuer.NewString("lambda")},
{valuer.NewString("msk")},
{valuer.NewString("rds")},
{valuer.NewString("s3sync")},
{valuer.NewString("sns")},
{valuer.NewString("sqs")},
},
}
// NewServiceID returns a new ServiceID from a string, validated against the supported services for the given cloud provider.
func NewServiceID(provider CloudProviderType, service string) (ServiceID, error) {
services, ok := SupportedServices[provider]
if !ok {
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "no services defined for cloud provider: %s", provider)
}
for _, s := range services {
if s.StringValue() == service {
return s, nil
}
}
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "invalid service id %q for cloud provider %s", service, provider)
}
// UTILS
// GetCloudIntegrationDashboardID returns the dashboard id for a cloud integration, given the cloud provider, service id, and dashboard id.
// This is used to generate unique dashboard ids for cloud integration, and also to parse the dashboard id to get the cloud provider and service id when needed.
func GetCloudIntegrationDashboardID(cloudProvider CloudProviderType, svcId, dashboardId string) string {
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcId, dashboardId)
}
// GetDashboardsFromAssets returns the list of dashboards for the cloud provider service from definition.
func GetDashboardsFromAssets(
svcId string,
orgID valuer.UUID,
cloudProvider CloudProviderType,
createdAt time.Time,
assets Assets,
) []*dashboardtypes.Dashboard {
dashboards := make([]*dashboardtypes.Dashboard, 0)
for _, d := range assets.Dashboards {
author := fmt.Sprintf("%s-integration", cloudProvider)
dashboards = append(dashboards, &dashboardtypes.Dashboard{
ID: GetCloudIntegrationDashboardID(cloudProvider, svcId, d.Id),
Locked: true,
OrgID: orgID,
Data: d.Definition,
TimeAuditable: types.TimeAuditable{
CreatedAt: createdAt,
UpdatedAt: createdAt,
},
UserAuditable: types.UserAuditable{
CreatedBy: author,
UpdatedBy: author,
},
})
}
return dashboards
}

View File

@@ -1,41 +0,0 @@
package cloudintegrationtypes
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Store interface {
// GetAccountByID returns a cloud integration account by id
GetAccountByID(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) (*StorableCloudIntegration, error)
// CreateAccount creates a new cloud integration account
CreateAccount(ctx context.Context, account *StorableCloudIntegration) (*StorableCloudIntegration, error)
// UpdateAccount updates an existing cloud integration account
UpdateAccount(ctx context.Context, account *StorableCloudIntegration) error
// RemoveAccount marks a cloud integration account as removed by setting the RemovedAt field
RemoveAccount(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) error
// GetConnectedAccounts returns all the cloud integration accounts for the org and cloud provider
GetConnectedAccounts(ctx context.Context, orgID valuer.UUID, provider CloudProviderType) ([]*StorableCloudIntegration, error)
// GetConnectedAccount for a given provider
GetConnectedAccount(ctx context.Context, orgID valuer.UUID, provider CloudProviderType, providerAccountID string) (*StorableCloudIntegration, error)
// cloud_integration_service related methods
// GetServiceByServiceID returns the cloud integration service for the given cloud integration id and service id
GetServiceByServiceID(ctx context.Context, cloudIntegrationID valuer.UUID, serviceID ServiceID) (*StorableCloudIntegrationService, error)
// CreateService creates a new cloud integration service
CreateService(ctx context.Context, service *StorableCloudIntegrationService) (*StorableCloudIntegrationService, error)
// UpdateService updates an existing cloud integration service
UpdateService(ctx context.Context, service *StorableCloudIntegrationService) error
// GetServices returns all the cloud integration services for the given cloud integration id
GetServices(ctx context.Context, cloudIntegrationID valuer.UUID) ([]*StorableCloudIntegrationService, error)
}

View File

@@ -40,7 +40,7 @@ type JSONAccessNode struct {
// Node information
Name string
IsTerminal bool
isRoot bool // marked true for only body_json and body_json_promoted
isRoot bool // marked true for only body_v2 and body_promoted
// Precomputed type information (single source of truth)
AvailableTypes []JSONDataType

View File

@@ -1,12 +1,19 @@
package telemetrytypes
import (
"fmt"
"testing"
otelconstants "github.com/SigNoz/signoz-otel-collector/constants"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
const (
bodyV2Column = otelconstants.BodyV2Column
bodyPromotedColumn = otelconstants.BodyPromotedColumn
)
// ============================================================================
// Helper Functions for Test Data Creation
// ============================================================================
@@ -109,8 +116,8 @@ func TestNode_Alias(t *testing.T) {
}{
{
name: "Root node returns name as-is",
node: NewRootJSONAccessNode("body_json", 32, 0),
expected: "body_json",
node: NewRootJSONAccessNode(bodyV2Column, 32, 0),
expected: bodyV2Column,
},
{
name: "Node without parent returns backticked name",
@@ -124,9 +131,9 @@ func TestNode_Alias(t *testing.T) {
name: "Node with root parent uses dot separator",
node: &JSONAccessNode{
Name: "age",
Parent: NewRootJSONAccessNode("body_json", 32, 0),
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
},
expected: "`" + "body_json" + ".age`",
expected: "`" + bodyV2Column + ".age`",
},
{
name: "Node with non-root parent uses array separator",
@@ -134,10 +141,10 @@ func TestNode_Alias(t *testing.T) {
Name: "name",
Parent: &JSONAccessNode{
Name: "education",
Parent: NewRootJSONAccessNode("body_json", 32, 0),
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
},
},
expected: "`" + "body_json" + ".education[].name`",
expected: "`" + bodyV2Column + ".education[].name`",
},
{
name: "Nested array path with multiple levels",
@@ -147,11 +154,11 @@ func TestNode_Alias(t *testing.T) {
Name: "awards",
Parent: &JSONAccessNode{
Name: "education",
Parent: NewRootJSONAccessNode("body_json", 32, 0),
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
},
},
},
expected: "`" + "body_json" + ".education[].awards[].type`",
expected: "`" + bodyV2Column + ".education[].awards[].type`",
},
}
@@ -173,18 +180,18 @@ func TestNode_FieldPath(t *testing.T) {
name: "Simple field path from root",
node: &JSONAccessNode{
Name: "user",
Parent: NewRootJSONAccessNode("body_json", 32, 0),
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
},
// FieldPath() always wraps the field name in backticks
expected: "body_json" + ".`user`",
expected: bodyV2Column + ".`user`",
},
{
name: "Field path with backtick-required key",
node: &JSONAccessNode{
Name: "user-name", // requires backtick
Parent: NewRootJSONAccessNode("body_json", 32, 0),
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
},
expected: "body_json" + ".`user-name`",
expected: bodyV2Column + ".`user-name`",
},
{
name: "Nested field path",
@@ -192,11 +199,11 @@ func TestNode_FieldPath(t *testing.T) {
Name: "age",
Parent: &JSONAccessNode{
Name: "user",
Parent: NewRootJSONAccessNode("body_json", 32, 0),
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
},
},
// FieldPath() always wraps the field name in backticks
expected: "`" + "body_json" + ".user`.`age`",
expected: "`" + bodyV2Column + ".user`.`age`",
},
{
name: "Array element field path",
@@ -204,11 +211,11 @@ func TestNode_FieldPath(t *testing.T) {
Name: "name",
Parent: &JSONAccessNode{
Name: "education",
Parent: NewRootJSONAccessNode("body_json", 32, 0),
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
},
},
// FieldPath() always wraps the field name in backticks
expected: "`" + "body_json" + ".education`.`name`",
expected: "`" + bodyV2Column + ".education`.`name`",
},
}
@@ -236,36 +243,36 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
{
name: "Simple path not promoted",
key: makeKey("user.name", String, false),
expectedYAML: `
expectedYAML: fmt.Sprintf(`
- name: user.name
column: body_json
column: %s
availableTypes:
- String
maxDynamicTypes: 16
isTerminal: true
elemType: String
`,
`, bodyV2Column),
},
{
name: "Simple path promoted",
key: makeKey("user.name", String, true),
expectedYAML: `
expectedYAML: fmt.Sprintf(`
- name: user.name
column: body_json
column: %s
availableTypes:
- String
maxDynamicTypes: 16
isTerminal: true
elemType: String
- name: user.name
column: body_json_promoted
column: %s
availableTypes:
- String
maxDynamicTypes: 16
maxDynamicPaths: 256
isTerminal: true
elemType: String
`,
`, bodyV2Column, bodyPromotedColumn),
},
{
name: "Empty path returns error",
@@ -278,8 +285,8 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
if tt.expectErr {
require.Error(t, err)
@@ -304,9 +311,9 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
{
name: "Single array level - JSON branch only",
path: "education[].name",
expectedYAML: `
expectedYAML: fmt.Sprintf(`
- name: education
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -318,14 +325,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
`,
`, bodyV2Column),
},
{
name: "Single array level - both JSON and Dynamic branches",
path: "education[].awards[].type",
expectedYAML: `
expectedYAML: fmt.Sprintf(`
- name: education
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -352,14 +359,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
`,
`, bodyV2Column),
},
{
name: "Deeply nested array path",
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
expectedYAML: `
expectedYAML: fmt.Sprintf(`
- name: interests
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -399,14 +406,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
- String
isTerminal: true
elemType: String
`,
`, bodyV2Column),
},
{
name: "ArrayAnyIndex replacement [*] to []",
path: "education[*].name",
expectedYAML: `
expectedYAML: fmt.Sprintf(`
- name: education
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -418,7 +425,7 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
`,
`, bodyV2Column),
},
}
@@ -426,8 +433,8 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
key := makeKey(tt.path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
require.NoError(t, err)
require.NotNil(t, key.JSONPlan)
@@ -445,15 +452,15 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
t.Run("Non-promoted plan", func(t *testing.T) {
key := makeKey(path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 1)
expectedYAML := `
expectedYAML := fmt.Sprintf(`
- name: education
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -480,7 +487,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
`
`, bodyV2Column)
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)
})
@@ -488,15 +495,15 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
t.Run("Promoted plan", func(t *testing.T) {
key := makeKey(path, String, true)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 2)
expectedYAML := `
expectedYAML := fmt.Sprintf(`
- name: education
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -524,7 +531,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
isTerminal: true
elemType: String
- name: education
column: body_json_promoted
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -554,7 +561,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
`
`, bodyV2Column, bodyPromotedColumn)
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)
})
@@ -575,11 +582,11 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
expectErr: true,
},
{
name: "Very deep nesting - validates progression doesn't go negative",
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
expectedYAML: `
name: "Very deep nesting - validates progression doesn't go negative",
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
expectedYAML: fmt.Sprintf(`
- name: interests
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -619,14 +626,14 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
- String
isTerminal: true
elemType: String
`,
`, bodyV2Column),
},
{
name: "Path with mixed scalar and array types",
path: "education[].type",
expectedYAML: `
name: "Path with mixed scalar and array types",
path: "education[].type",
expectedYAML: fmt.Sprintf(`
- name: education
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -639,20 +646,20 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
`,
`, bodyV2Column),
},
{
name: "Exists with only array types available",
path: "education",
expectedYAML: `
name: "Exists with only array types available",
path: "education",
expectedYAML: fmt.Sprintf(`
- name: education
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
isTerminal: true
elemType: Array(JSON)
`,
`, bodyV2Column),
},
}
@@ -668,8 +675,8 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
}
key := makeKey(tt.path, keyType, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
if tt.expectErr {
require.Error(t, err)
@@ -687,15 +694,15 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
path := "education[].awards[].participated[].team[].branch"
key := makeKey(path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 1)
expectedYAML := `
expectedYAML := fmt.Sprintf(`
- name: education
column: body_json
column: %s
availableTypes:
- Array(JSON)
maxDynamicTypes: 16
@@ -780,7 +787,7 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
maxDynamicPaths: 64
isTerminal: true
elemType: String
`
`, bodyV2Column)
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)