mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-19 03:02:16 +00:00
Compare commits
4 Commits
fix/order-
...
v0.116.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
155f287462 | ||
|
|
c8fcc48022 | ||
|
|
44b6885639 | ||
|
|
0e5a128325 |
@@ -190,7 +190,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.116.0
|
||||
image: signoz/signoz:v0.116.1
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
# - "6060:6060" # pprof port
|
||||
|
||||
@@ -117,7 +117,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.116.0
|
||||
image: signoz/signoz:v0.116.1
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
volumes:
|
||||
|
||||
@@ -181,7 +181,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.116.0}
|
||||
image: signoz/signoz:${VERSION:-v0.116.1}
|
||||
container_name: signoz
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
|
||||
@@ -109,7 +109,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.116.0}
|
||||
image: signoz/signoz:${VERSION:-v0.116.1}
|
||||
container_name: signoz
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
|
||||
2
go.mod
2
go.mod
@@ -11,7 +11,6 @@ require (
|
||||
github.com/SigNoz/signoz-otel-collector v0.144.2
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/antonmedv/expr v1.15.3
|
||||
github.com/bytedance/sonic v1.14.1
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/coreos/go-oidc/v3 v3.17.0
|
||||
github.com/dgraph-io/ristretto/v2 v2.3.0
|
||||
@@ -106,6 +105,7 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
|
||||
github.com/aws/smithy-go v1.24.0 // indirect
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic v1.14.1 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
@@ -41,9 +40,7 @@ func (middleware *AuthZ) ViewAccess(next http.HandlerFunc) http.HandlerFunc {
|
||||
return
|
||||
}
|
||||
|
||||
commentCtx := ctxtypes.CommentFromContext(ctx)
|
||||
authtype, ok := commentCtx.Map()["auth_type"]
|
||||
if ok && (authtype == authtypes.IdentNProviderAPIkey.StringValue()) {
|
||||
if claims.IdentNProvider == authtypes.IdentNProviderAPIkey.StringValue() {
|
||||
if err := claims.IsViewer(); err != nil {
|
||||
middleware.logger.WarnContext(ctx, authzDeniedMessage, "claims", claims)
|
||||
render.Error(rw, err)
|
||||
@@ -93,9 +90,7 @@ func (middleware *AuthZ) EditAccess(next http.HandlerFunc) http.HandlerFunc {
|
||||
return
|
||||
}
|
||||
|
||||
commentCtx := ctxtypes.CommentFromContext(ctx)
|
||||
authtype, ok := commentCtx.Map()["auth_type"]
|
||||
if ok && (authtype == authtypes.IdentNProviderAPIkey.StringValue()) {
|
||||
if claims.IdentNProvider == authtypes.IdentNProviderAPIkey.StringValue() {
|
||||
if err := claims.IsEditor(); err != nil {
|
||||
middleware.logger.WarnContext(ctx, authzDeniedMessage, "claims", claims)
|
||||
render.Error(rw, err)
|
||||
@@ -144,9 +139,7 @@ func (middleware *AuthZ) AdminAccess(next http.HandlerFunc) http.HandlerFunc {
|
||||
return
|
||||
}
|
||||
|
||||
commentCtx := ctxtypes.CommentFromContext(ctx)
|
||||
authtype, ok := commentCtx.Map()["auth_type"]
|
||||
if ok && (authtype == authtypes.IdentNProviderAPIkey.StringValue()) {
|
||||
if claims.IdentNProvider == authtypes.IdentNProviderAPIkey.StringValue() {
|
||||
if err := claims.IsAdmin(); err != nil {
|
||||
middleware.logger.WarnContext(ctx, authzDeniedMessage, "claims", claims)
|
||||
render.Error(rw, err)
|
||||
|
||||
@@ -101,13 +101,8 @@ func (provider *provider) GetIdentity(req *http.Request) (*authtypes.Identity, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
identity := authtypes.Identity{
|
||||
UserID: user.ID,
|
||||
Role: apiKey.Role,
|
||||
Email: user.Email,
|
||||
OrgID: user.OrgID,
|
||||
}
|
||||
return &identity, nil
|
||||
identity := authtypes.NewIdentity(user.ID, user.OrgID, user.Email, apiKey.Role, provider.Name())
|
||||
return identity, nil
|
||||
}
|
||||
|
||||
func (provider *provider) Post(ctx context.Context, _ *http.Request, _ authtypes.Claims) {
|
||||
|
||||
@@ -78,7 +78,7 @@ func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetype
|
||||
|
||||
// add the paths that are not promoted but have indexes
|
||||
for path, indexes := range aggr {
|
||||
path := strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
|
||||
path := strings.TrimPrefix(path, telemetrylogs.BodyV2ColumnPrefix)
|
||||
path = telemetrytypes.BodyJSONStringSearchPrefix + path
|
||||
response = append(response, promotetypes.PromotePath{
|
||||
Path: path,
|
||||
@@ -163,7 +163,7 @@ func (m *module) PromoteAndIndexPaths(
|
||||
}
|
||||
}
|
||||
if len(it.Indexes) > 0 {
|
||||
parentColumn := telemetrylogs.LogsV2BodyJSONColumn
|
||||
parentColumn := telemetrylogs.LogsV2BodyV2Column
|
||||
// if the path is already promoted or is being promoted, add it to the promoted column
|
||||
if _, promoted := existingPromotedPaths[it.Path]; promoted || it.Promote {
|
||||
parentColumn = telemetrylogs.LogsV2BodyPromotedColumn
|
||||
|
||||
@@ -10,13 +10,11 @@ import (
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/bytedance/sonic"
|
||||
)
|
||||
|
||||
type builderQuery[T any] struct {
|
||||
@@ -262,40 +260,6 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// merge body_json and promoted into body
|
||||
if q.spec.Signal == telemetrytypes.SignalLogs {
|
||||
switch typedPayload := payload.(type) {
|
||||
case *qbtypes.RawData:
|
||||
for _, rr := range typedPayload.Rows {
|
||||
seeder := func() error {
|
||||
body, ok := rr.Data[telemetrylogs.LogsV2BodyJSONColumn].(map[string]any)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
promoted, ok := rr.Data[telemetrylogs.LogsV2BodyPromotedColumn].(map[string]any)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
seed(promoted, body)
|
||||
str, err := sonic.MarshalString(body)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal body")
|
||||
}
|
||||
rr.Data["body"] = str
|
||||
return nil
|
||||
}
|
||||
err := seeder()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
delete(rr.Data, telemetrylogs.LogsV2BodyJSONColumn)
|
||||
delete(rr.Data, telemetrylogs.LogsV2BodyPromotedColumn)
|
||||
}
|
||||
payload = typedPayload
|
||||
}
|
||||
}
|
||||
|
||||
return &qbtypes.Result{
|
||||
Type: q.kind,
|
||||
Value: payload,
|
||||
@@ -423,18 +387,3 @@ func decodeCursor(cur string) (int64, error) {
|
||||
}
|
||||
return strconv.ParseInt(string(b), 10, 64)
|
||||
}
|
||||
|
||||
func seed(promoted map[string]any, body map[string]any) {
|
||||
for key, fromValue := range promoted {
|
||||
if toValue, ok := body[key]; !ok {
|
||||
body[key] = fromValue
|
||||
} else {
|
||||
if fromValue, ok := fromValue.(map[string]any); ok {
|
||||
if toValue, ok := toValue.(map[string]any); ok {
|
||||
seed(fromValue, toValue)
|
||||
body[key] = toValue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/bytedance/sonic"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -394,17 +393,11 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
|
||||
|
||||
// de-reference the typed pointer to any
|
||||
val := reflect.ValueOf(cellPtr).Elem().Interface()
|
||||
|
||||
// Post-process JSON columns: normalize into structured values
|
||||
// Post-process JSON columns: normalize into String value
|
||||
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
|
||||
switch x := val.(type) {
|
||||
case []byte:
|
||||
if len(x) > 0 {
|
||||
var v any
|
||||
if err := sonic.Unmarshal(x, &v); err == nil {
|
||||
val = v
|
||||
}
|
||||
}
|
||||
val = string(x)
|
||||
default:
|
||||
// already a structured type (map[string]any, []any, etc.)
|
||||
}
|
||||
|
||||
@@ -115,6 +115,7 @@ func (r *Repo) GetLatestVersion(
|
||||
func (r *Repo) insertConfig(
|
||||
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
|
||||
) error {
|
||||
|
||||
if c.ElementType.StringValue() == "" {
|
||||
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
|
||||
}
|
||||
@@ -228,25 +229,6 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
|
||||
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
var version opamptypes.AgentConfigVersion
|
||||
err := r.store.BunDB().NewSelect().
|
||||
Model(&version).
|
||||
ColumnExpr("deploy_status").
|
||||
Where("hash = ?", configHash).
|
||||
Where("org_id = ?", orgId).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
|
||||
}
|
||||
return version.DeployStatus, nil
|
||||
}
|
||||
|
||||
func (r *Repo) updateDeployStatusByHash(
|
||||
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
|
||||
) error {
|
||||
|
||||
@@ -180,12 +180,6 @@ func (m *Manager) ReportConfigDeploymentStatus(
|
||||
}
|
||||
}
|
||||
|
||||
// Implements model.AgentConfigProvider
|
||||
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
|
||||
}
|
||||
|
||||
|
||||
func GetLatestVersion(
|
||||
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
|
||||
) (*opamptypes.AgentConfigVersion, error) {
|
||||
|
||||
@@ -131,32 +131,38 @@ func (ic *LogParsingPipelineController) ValidatePipelines(ctx context.Context,
|
||||
return err
|
||||
}
|
||||
|
||||
func (ic *LogParsingPipelineController) getNormalizePipeline() pipelinetypes.GettablePipeline {
|
||||
return pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
||||
Name: "Default Pipeline - PreProcessing Body",
|
||||
Alias: "NormalizeBodyDefault",
|
||||
Enabled: true,
|
||||
},
|
||||
Filter: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body",
|
||||
func (ic *LogParsingPipelineController) getDefaultPipelines() ([]pipelinetypes.GettablePipeline, error) {
|
||||
defaultPipelines := []pipelinetypes.GettablePipeline{}
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
preprocessingPipeline := pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
||||
Name: "Default Pipeline - PreProcessing Body",
|
||||
Alias: "NormalizeBodyDefault",
|
||||
Enabled: true,
|
||||
},
|
||||
Filter: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body",
|
||||
},
|
||||
Operator: v3.FilterOperatorExists,
|
||||
},
|
||||
Operator: v3.FilterOperatorExists,
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []pipelinetypes.PipelineOperator{
|
||||
{
|
||||
ID: uuid.NewString(),
|
||||
Type: "normalize",
|
||||
Enabled: true,
|
||||
If: "body != nil",
|
||||
Config: []pipelinetypes.PipelineOperator{
|
||||
{
|
||||
ID: uuid.NewString(),
|
||||
Type: "normalize",
|
||||
Enabled: true,
|
||||
If: "body != nil",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
defaultPipelines = append(defaultPipelines, preprocessingPipeline)
|
||||
}
|
||||
return defaultPipelines, nil
|
||||
}
|
||||
|
||||
// Returns effective list of pipelines including user created
|
||||
@@ -289,10 +295,12 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
// add default normalize pipeline at the beginning
|
||||
pipelinesResp.Pipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, pipelinesResp.Pipelines...)
|
||||
// recommend default pipelines along with user created pipelines
|
||||
defaultPipelines, err := pc.getDefaultPipelines()
|
||||
if err != nil {
|
||||
return nil, "", model.InternalError(fmt.Errorf("failed to get default pipelines: %w", err))
|
||||
}
|
||||
pipelinesResp.Pipelines = append(pipelinesResp.Pipelines, defaultPipelines...)
|
||||
|
||||
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, pipelinesResp.Pipelines)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package opamp
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
)
|
||||
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
|
||||
// Interface for a source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/google/uuid"
|
||||
"github.com/knadh/koanf"
|
||||
@@ -128,11 +127,6 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
|
||||
return exists
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
|
||||
subscriberId := uuid.NewString()
|
||||
|
||||
@@ -111,99 +111,90 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// agentDescriptionChanged returns true when the agent sends updated properties
|
||||
// (e.g. capability flag, version) mid-connection, signalling the server to
|
||||
// recompute and push a new RemoteConfig.
|
||||
//
|
||||
// On reconnect this always returns false: handleFirstStatus pre-copies
|
||||
// AgentDescription into agent.Status so no diff is detected, avoiding a
|
||||
// redundant config recompute.
|
||||
func (agent *Agent) agentDescriptionChanged(newStatus *protobufs.AgentToServer) bool {
|
||||
// nil AgentDescription means no change per OpAMP protocol.
|
||||
if newStatus.AgentDescription == nil {
|
||||
return false
|
||||
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
prevStatus := agent.Status
|
||||
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
} else {
|
||||
// Not a new Agent. Update the Status.
|
||||
agent.Status.SequenceNum = newStatus.SequenceNum
|
||||
|
||||
// Check what's changed in the AgentDescription.
|
||||
if newStatus.AgentDescription != nil {
|
||||
// If the AgentDescription field is set it means the Agent tells us
|
||||
// something is changed in the field since the last status report
|
||||
// (or this is the first report).
|
||||
// Make full comparison of previous and new descriptions to see if it
|
||||
// really is different.
|
||||
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
|
||||
// Agent description didn't change.
|
||||
agentDescrChanged = false
|
||||
} else {
|
||||
// Yes, the description is different, update it.
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
agentDescrChanged = true
|
||||
}
|
||||
} else {
|
||||
// AgentDescription field is not set, which means description didn't change.
|
||||
agentDescrChanged = false
|
||||
}
|
||||
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil &&
|
||||
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
|
||||
}
|
||||
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
|
||||
return false
|
||||
|
||||
if agentDescrChanged {
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
}
|
||||
|
||||
return agentDescrChanged
|
||||
}
|
||||
|
||||
func (agent *Agent) updateHealth(newStatus *protobufs.AgentToServer) {
|
||||
if newStatus.Health == nil {
|
||||
return
|
||||
}
|
||||
|
||||
agent.Status.Health = newStatus.Health
|
||||
|
||||
if agent.Status != nil && agent.Status.Health != nil && agent.Status.Health.Healthy {
|
||||
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC()
|
||||
}
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
return true
|
||||
}
|
||||
|
||||
// updateRemoteConfigStatus updates the stored RemoteConfigStatus and notifies
|
||||
// subscribers if the status has changed relative to what we have stored.
|
||||
func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer) {
|
||||
if newStatus.RemoteConfigStatus == nil ||
|
||||
proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
return
|
||||
}
|
||||
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
hash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
switch newStatus.RemoteConfigStatus.Status {
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, hash, newStatus.RemoteConfigStatus.ErrorMessage)
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
}
|
||||
}
|
||||
|
||||
// handleFirstStatus initializes agent.Status on the first message received from
|
||||
// this agent instance. It is a no-op for all subsequent messages.
|
||||
func (agent *Agent) handleFirstStatus(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) {
|
||||
if agent.Status != nil {
|
||||
return
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
}
|
||||
|
||||
// Initialize with a clean slate.
|
||||
agent.Status = &protobufs.AgentToServer{
|
||||
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
|
||||
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
},
|
||||
}
|
||||
|
||||
if newStatus.RemoteConfigStatus == nil ||
|
||||
newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
|
||||
// Agent just started fresh — no prior deployment to reconcile with the DB.
|
||||
return
|
||||
}
|
||||
|
||||
// Since the server's connection is restarted;
|
||||
// copy the agent description; so no change is detected by agentDescriptionChanged
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
|
||||
// Server reconnected while the agent was already running.
|
||||
// Reconcile deployment status with DB; DB is the source of truth.
|
||||
// If DB says in_progress but agent now reports APPLIED/FAILED,
|
||||
// updateRemoteConfigStatus will detect the transition and notify subscribers.
|
||||
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, agent.OrgID.String()+rawHash)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
|
||||
|
||||
// If the deployment is still in-flight, rehydrate the subscriber so that
|
||||
// updateRemoteConfigStatus can fire onConfigSuccess/onConfigFailure when
|
||||
// the agent next reports a terminal status.
|
||||
if deployStatus != opamptypes.Deployed && deployStatus != opamptypes.DeployFailed {
|
||||
ListenToConfigUpdate(agent.OrgID, agent.AgentID, rawHash, configProvider.ReportConfigDeploymentStatus)
|
||||
}
|
||||
}
|
||||
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) bool {
|
||||
agent.handleFirstStatus(newStatus, configProvider)
|
||||
agentDescrChanged := agent.agentDescriptionChanged(newStatus)
|
||||
// record healthy timestamp
|
||||
if newStatus.Health != nil && newStatus.Health.Healthy {
|
||||
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(newStatus.Health.StartTimeUnixNano)).UTC()
|
||||
}
|
||||
// notify subscribers first; this will update the status in the DB
|
||||
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
|
||||
agent.updateRemoteConfigStatus(newStatus)
|
||||
// update local reference in last.
|
||||
agent.Status = newStatus
|
||||
agent.updateHealth(newStatus)
|
||||
return agentDescrChanged
|
||||
}
|
||||
|
||||
@@ -246,7 +237,7 @@ func (agent *Agent) processStatusUpdate(
|
||||
// current status is not up-to-date.
|
||||
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
|
||||
|
||||
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
|
||||
agentDescrChanged := agent.updateStatusField(newStatus)
|
||||
|
||||
// Check if any fields were omitted in the status report.
|
||||
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
import "github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
// Interface for source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
@@ -25,10 +20,4 @@ type AgentConfigProvider interface {
|
||||
configId string,
|
||||
err error,
|
||||
)
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
|
||||
// no matching row exists. Used by the agent's first-connect handler to
|
||||
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
|
||||
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
|
||||
}
|
||||
|
||||
@@ -66,7 +66,6 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
|
||||
defer coordinator.mutex.Unlock()
|
||||
|
||||
key := getSubscriberKey(orgId, hash)
|
||||
|
||||
if subs, ok := coordinator.subscribers[key]; ok {
|
||||
subs = append(subs, ss)
|
||||
coordinator.subscribers[key] = subs
|
||||
|
||||
@@ -7,12 +7,14 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
plabels "github.com/prometheus/prometheus/model/labels"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
// PromRuleTask is a promql rule executor
|
||||
@@ -371,7 +373,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
|
||||
comment := ctxtypes.CommentFromContext(ctx)
|
||||
comment.Set("rule_id", rule.ID())
|
||||
comment.Set("auth_type", "internal")
|
||||
comment.Set("identn_provider", authtypes.IdentNProviderInternal.StringValue())
|
||||
ctx = ctxtypes.NewContextWithComment(ctx, comment)
|
||||
|
||||
_, err := rule.Eval(ctx, ts)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
@@ -358,7 +359,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
|
||||
comment := ctxtypes.CommentFromContext(ctx)
|
||||
comment.Set("rule_id", rule.ID())
|
||||
comment.Set("auth_type", "internal")
|
||||
comment.Set("identn_provider", authtypes.IdentNProviderInternal.StringValue())
|
||||
ctx = ctxtypes.NewContextWithComment(ctx, comment)
|
||||
|
||||
_, err := rule.Eval(ctx, ts)
|
||||
|
||||
@@ -219,7 +219,6 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
|
||||
// we don't have a toBoolOrNull in ClickHouse, so we need to convert the bool to a string
|
||||
value = fmt.Sprintf("%t", v)
|
||||
}
|
||||
|
||||
case telemetrytypes.FieldDataTypeInt64,
|
||||
telemetrytypes.FieldDataTypeArrayInt64,
|
||||
telemetrytypes.FieldDataTypeNumber,
|
||||
|
||||
@@ -313,37 +313,30 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
|
||||
return ""
|
||||
}
|
||||
child := ctx.GetChild(0)
|
||||
var searchText string
|
||||
if keyCtx, ok := child.(*grammar.KeyContext); ok {
|
||||
// create a full text search condition on the body field
|
||||
|
||||
keyText := keyCtx.GetText()
|
||||
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(keyText), v.builder, v.startNs, v.endNs)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
|
||||
return ""
|
||||
}
|
||||
return cond
|
||||
searchText = keyCtx.GetText()
|
||||
} else if valCtx, ok := child.(*grammar.ValueContext); ok {
|
||||
var text string
|
||||
if valCtx.QUOTED_TEXT() != nil {
|
||||
text = trimQuotes(valCtx.QUOTED_TEXT().GetText())
|
||||
searchText = trimQuotes(valCtx.QUOTED_TEXT().GetText())
|
||||
} else if valCtx.NUMBER() != nil {
|
||||
text = valCtx.NUMBER().GetText()
|
||||
searchText = valCtx.NUMBER().GetText()
|
||||
} else if valCtx.BOOL() != nil {
|
||||
text = valCtx.BOOL().GetText()
|
||||
searchText = valCtx.BOOL().GetText()
|
||||
} else if valCtx.KEY() != nil {
|
||||
text = valCtx.KEY().GetText()
|
||||
searchText = valCtx.KEY().GetText()
|
||||
} else {
|
||||
v.errors = append(v.errors, fmt.Sprintf("unsupported value type: %s", valCtx.GetText()))
|
||||
return ""
|
||||
}
|
||||
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(text), v.builder, v.startNs, v.endNs)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
|
||||
return ""
|
||||
}
|
||||
return cond
|
||||
}
|
||||
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(searchText), v.builder, v.startNs, v.endNs)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
|
||||
return ""
|
||||
}
|
||||
return cond
|
||||
}
|
||||
|
||||
return "" // Should not happen with valid input
|
||||
@@ -383,6 +376,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
|
||||
for _, key := range keys {
|
||||
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, nil, v.builder, v.startNs, v.endNs)
|
||||
if err != nil {
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build condition: %s", err.Error()))
|
||||
return ""
|
||||
}
|
||||
conds = append(conds, condition)
|
||||
@@ -648,7 +642,6 @@ func (v *filterExpressionVisitor) VisitValueList(ctx *grammar.ValueListContext)
|
||||
|
||||
// VisitFullText handles standalone quoted strings for full-text search
|
||||
func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) any {
|
||||
|
||||
if v.skipFullTextFilter {
|
||||
return ""
|
||||
}
|
||||
@@ -670,6 +663,7 @@ func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) an
|
||||
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
|
||||
return ""
|
||||
}
|
||||
|
||||
return cond
|
||||
}
|
||||
|
||||
|
||||
@@ -3,14 +3,12 @@ package telemetrylogs
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
@@ -35,7 +33,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
return "", err
|
||||
}
|
||||
|
||||
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled {
|
||||
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && key.Name != messageSubField {
|
||||
valueType, value := InferDataType(value, operator, key)
|
||||
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
|
||||
if err != nil {
|
||||
@@ -54,14 +52,14 @@ func (c *conditionBuilder) conditionFor(
|
||||
}
|
||||
|
||||
// Check if this is a body JSON search - either by FieldContext
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
|
||||
tblFieldName, value = GetBodyJSONKey(ctx, key, operator, value)
|
||||
}
|
||||
|
||||
tblFieldName, value = querybuilder.DataTypeCollisionHandledFieldName(key, value, tblFieldName, operator)
|
||||
|
||||
// make use of case insensitive index for body
|
||||
if tblFieldName == "body" {
|
||||
if tblFieldName == "body" || tblFieldName == messageSubColumn {
|
||||
switch operator {
|
||||
case qbtypes.FilterOperatorLike:
|
||||
return sb.ILike(tblFieldName, value), nil
|
||||
@@ -108,7 +106,6 @@ func (c *conditionBuilder) conditionFor(
|
||||
return sb.ILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
|
||||
case qbtypes.FilterOperatorNotContains:
|
||||
return sb.NotILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
|
||||
|
||||
case qbtypes.FilterOperatorRegexp:
|
||||
// Note: Escape $$ to $$$$ to avoid sqlbuilder interpreting materialized $ signs
|
||||
// Only needed because we are using sprintf instead of sb.Match (not implemented in sqlbuilder)
|
||||
@@ -178,9 +175,8 @@ func (c *conditionBuilder) conditionFor(
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(tblFieldName), nil
|
||||
} else {
|
||||
return sb.IsNull(tblFieldName), nil
|
||||
}
|
||||
return sb.IsNull(tblFieldName), nil
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
@@ -247,19 +243,30 @@ func (c *conditionBuilder) ConditionFor(
|
||||
return "", err
|
||||
}
|
||||
|
||||
if !(key.FieldContext == telemetrytypes.FieldContextBody && querybuilder.BodyJSONQueryEnabled) && operator.AddDefaultExistsFilter() {
|
||||
// skip adding exists filter for intrinsic fields
|
||||
// with an exception for body json search
|
||||
field, _ := c.fm.FieldFor(ctx, key)
|
||||
if slices.Contains(maps.Keys(IntrinsicFields), field) && key.FieldContext != telemetrytypes.FieldContextBody {
|
||||
// Skip adding exists filter for intrinsic fields i.e. Table level log context fields
|
||||
buildExistCondition := operator.AddDefaultExistsFilter()
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextScope:
|
||||
// pass; No need to build exist condition for top level columns
|
||||
// immediately return
|
||||
return condition, nil
|
||||
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute:
|
||||
// build exist condition for resource and attribute fields based on filter operator
|
||||
case telemetrytypes.FieldContextBody:
|
||||
// Querying JSON fields already account for Nullability of fields
|
||||
// so additional exists checks are not needed
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
return condition, nil
|
||||
}
|
||||
}
|
||||
|
||||
if buildExistCondition {
|
||||
existsCondition, err := c.conditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return sb.And(condition, existsCondition), nil
|
||||
}
|
||||
|
||||
return condition, nil
|
||||
}
|
||||
|
||||
@@ -127,7 +127,8 @@ func TestConditionFor(t *testing.T) {
|
||||
{
|
||||
name: "Contains operator - body",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body",
|
||||
Name: "body",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorContains,
|
||||
value: 521509198310,
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
package telemetrylogs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
@@ -17,7 +20,7 @@ const (
|
||||
LogsV2TimestampColumn = "timestamp"
|
||||
LogsV2ObservedTimestampColumn = "observed_timestamp"
|
||||
LogsV2BodyColumn = "body"
|
||||
LogsV2BodyJSONColumn = constants.BodyV2Column
|
||||
LogsV2BodyV2Column = constants.BodyV2Column
|
||||
LogsV2BodyPromotedColumn = constants.BodyPromotedColumn
|
||||
LogsV2TraceIDColumn = "trace_id"
|
||||
LogsV2SpanIDColumn = "span_id"
|
||||
@@ -34,8 +37,14 @@ const (
|
||||
LogsV2ResourcesStringColumn = "resources_string"
|
||||
LogsV2ScopeStringColumn = "scope_string"
|
||||
|
||||
BodyJSONColumnPrefix = constants.BodyV2ColumnPrefix
|
||||
BodyV2ColumnPrefix = constants.BodyV2ColumnPrefix
|
||||
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
|
||||
|
||||
// messageSubColumn is the ClickHouse sub-column that body searches map to
|
||||
// when BodyJSONQueryEnabled is true.
|
||||
messageSubField = "message"
|
||||
messageSubColumn = "body_v2.message"
|
||||
bodySearchDefaultWarning = "body searches default to `body.message:string`. Use `body.<key>` to search a different field inside body"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -118,3 +127,11 @@ var (
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func bodyAliasExpression() string {
|
||||
if !querybuilder.BodyJSONQueryEnabled {
|
||||
return LogsV2BodyColumn
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s as body", LogsV2BodyV2Column)
|
||||
}
|
||||
|
||||
@@ -30,7 +30,8 @@ var (
|
||||
"severity_text": {Name: "severity_text", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
|
||||
"severity_number": {Name: "severity_number", Type: schema.ColumnTypeUInt8},
|
||||
"body": {Name: "body", Type: schema.ColumnTypeString},
|
||||
LogsV2BodyJSONColumn: {Name: LogsV2BodyJSONColumn, Type: schema.JSONColumnType{
|
||||
messageSubColumn: {Name: messageSubColumn, Type: schema.ColumnTypeString},
|
||||
LogsV2BodyV2Column: {Name: LogsV2BodyV2Column, Type: schema.JSONColumnType{
|
||||
MaxDynamicTypes: utils.ToPointer(uint(32)),
|
||||
MaxDynamicPaths: utils.ToPointer(uint(0)),
|
||||
}},
|
||||
@@ -88,21 +89,26 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
|
||||
return logsV2Columns["attributes_bool"], nil
|
||||
}
|
||||
case telemetrytypes.FieldContextBody:
|
||||
// Body context is for JSON body fields
|
||||
// Use body_json if feature flag is enabled
|
||||
// Body context is for JSON body fields. Use body_v2 if feature flag is enabled.
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
return logsV2Columns[LogsV2BodyJSONColumn], nil
|
||||
if key.Name == messageSubField {
|
||||
return logsV2Columns[messageSubColumn], nil
|
||||
}
|
||||
return logsV2Columns[LogsV2BodyV2Column], nil
|
||||
}
|
||||
// Fall back to legacy body column
|
||||
return logsV2Columns["body"], nil
|
||||
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
|
||||
if key.Name == LogsV2BodyColumn && querybuilder.BodyJSONQueryEnabled {
|
||||
return logsV2Columns[messageSubColumn], nil
|
||||
}
|
||||
col, ok := logsV2Columns[key.Name]
|
||||
if !ok {
|
||||
// check if the key has body JSON search
|
||||
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
|
||||
// Use body_json if feature flag is enabled and we have a body condition builder
|
||||
// Use body_v2 if feature flag is enabled and we have a body condition builder
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
return logsV2Columns[LogsV2BodyJSONColumn], nil
|
||||
return logsV2Columns[LogsV2BodyV2Column], nil
|
||||
}
|
||||
// Fall back to legacy body column
|
||||
return logsV2Columns["body"], nil
|
||||
@@ -138,6 +144,10 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
|
||||
}
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
||||
case telemetrytypes.FieldContextBody:
|
||||
if key.Name == messageSubField {
|
||||
return messageSubColumn, nil
|
||||
}
|
||||
|
||||
if key.JSONDataType == nil {
|
||||
return "", qbtypes.ErrColumnNotFound
|
||||
}
|
||||
@@ -246,34 +256,37 @@ func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (
|
||||
node := plan[0]
|
||||
|
||||
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
|
||||
if key.Materialized {
|
||||
if len(plan) < 2 {
|
||||
return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
|
||||
"plan length is less than 2 for promoted path: %s", key.Name)
|
||||
}
|
||||
// TODO(Piyush): Promoted path logic commented out. Materialized now means type hint
|
||||
// promotion will be extracted from key field evolution
|
||||
// (direct sub-column access), not a promoted body_promoted.* column.
|
||||
// if key.Materialized {
|
||||
// if len(plan) < 2 {
|
||||
// return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
|
||||
// "plan length is less than 2 for promoted path: %s", key.Name)
|
||||
// }
|
||||
|
||||
node := plan[1]
|
||||
promotedExpr := fmt.Sprintf(
|
||||
"dynamicElement(%s, '%s')",
|
||||
node.FieldPath(),
|
||||
node.TerminalConfig.ElemType.StringValue(),
|
||||
)
|
||||
// node := plan[1]
|
||||
// promotedExpr := fmt.Sprintf(
|
||||
// "dynamicElement(%s, '%s')",
|
||||
// node.FieldPath(),
|
||||
// node.TerminalConfig.ElemType.StringValue(),
|
||||
// )
|
||||
|
||||
// dynamicElement returns NULL for scalar types or an empty array for array types.
|
||||
if node.TerminalConfig.ElemType.IsArray {
|
||||
expr = fmt.Sprintf(
|
||||
"if(length(%s) > 0, %s, %s)",
|
||||
promotedExpr,
|
||||
promotedExpr,
|
||||
expr,
|
||||
)
|
||||
} else {
|
||||
// promoted column first then body_json column
|
||||
// TODO(Piyush): Change this in future for better performance
|
||||
expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
|
||||
}
|
||||
// // dynamicElement returns NULL for scalar types or an empty array for array types.
|
||||
// if node.TerminalConfig.ElemType.IsArray {
|
||||
// expr = fmt.Sprintf(
|
||||
// "if(length(%s) > 0, %s, %s)",
|
||||
// promotedExpr,
|
||||
// promotedExpr,
|
||||
// expr,
|
||||
// )
|
||||
// } else {
|
||||
// // promoted column first then body_json column
|
||||
// // TODO(Piyush): Change this in future for better performance
|
||||
// expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
|
||||
// }
|
||||
|
||||
}
|
||||
// }
|
||||
|
||||
return expr, nil
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType te
|
||||
return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType]}
|
||||
}
|
||||
|
||||
// BuildCondition builds the full WHERE condition for body_json JSON paths
|
||||
// BuildCondition builds the full WHERE condition for body_v2 JSON paths
|
||||
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
conditions := []string{}
|
||||
for _, node := range c.key.JSONPlan {
|
||||
@@ -40,6 +40,7 @@ func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperato
|
||||
}
|
||||
conditions = append(conditions, condition)
|
||||
}
|
||||
|
||||
return sb.Or(conditions...), nil
|
||||
}
|
||||
|
||||
@@ -288,9 +289,9 @@ func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, field
|
||||
}
|
||||
return sb.NotIn(fieldExpr, values...), nil
|
||||
case qbtypes.FilterOperatorExists:
|
||||
return fmt.Sprintf("%s IS NOT NULL", fieldExpr), nil
|
||||
return sb.IsNotNull(fieldExpr), nil
|
||||
case qbtypes.FilterOperatorNotExists:
|
||||
return fmt.Sprintf("%s IS NULL", fieldExpr), nil
|
||||
return sb.IsNull(fieldExpr), nil
|
||||
// between and not between
|
||||
case qbtypes.FilterOperatorBetween, qbtypes.FilterOperatorNotBetween:
|
||||
values, ok := value.([]any)
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -65,7 +65,7 @@ func (b *logQueryStatementBuilder) Build(
|
||||
start = querybuilder.ToNanoSecs(start)
|
||||
end = querybuilder.ToNanoSecs(end)
|
||||
|
||||
keySelectors := getKeySelectors(query)
|
||||
keySelectors, warnings := getKeySelectors(query)
|
||||
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -76,20 +76,29 @@ func (b *logQueryStatementBuilder) Build(
|
||||
// Create SQL builder
|
||||
q := sqlbuilder.NewSelectBuilder()
|
||||
|
||||
var stmt *qbtypes.Statement
|
||||
switch requestType {
|
||||
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
|
||||
return b.buildListQuery(ctx, q, query, start, end, keys, variables)
|
||||
stmt, err = b.buildListQuery(ctx, q, query, start, end, keys, variables)
|
||||
case qbtypes.RequestTypeTimeSeries:
|
||||
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
|
||||
stmt, err = b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
|
||||
case qbtypes.RequestTypeScalar:
|
||||
return b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
|
||||
stmt, err = b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
|
||||
}
|
||||
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stmt.Warnings = append(stmt.Warnings, warnings...)
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
|
||||
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([]*telemetrytypes.FieldKeySelector, []string) {
|
||||
var keySelectors []*telemetrytypes.FieldKeySelector
|
||||
var warnings []string
|
||||
|
||||
for idx := range query.Aggregations {
|
||||
aggExpr := query.Aggregations[idx]
|
||||
@@ -136,7 +145,19 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []
|
||||
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
||||
}
|
||||
|
||||
return keySelectors
|
||||
// When the new JSON body experience is enabled, warn the user if they use the bare
|
||||
// "body" key in the filter — queries on plain "body" default to body.message:string.
|
||||
// TODO(Piyush): Setup better for coming FTS support.
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
for _, sel := range keySelectors {
|
||||
if sel.Name == LogsV2BodyColumn {
|
||||
warnings = append(warnings, bodySearchDefaultWarning)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return keySelectors, warnings
|
||||
}
|
||||
|
||||
func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
|
||||
@@ -203,7 +224,6 @@ func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[stri
|
||||
}
|
||||
|
||||
func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
|
||||
|
||||
// First check if it matches with any intrinsic fields
|
||||
var intrinsicOrCalculatedField telemetrytypes.TelemetryFieldKey
|
||||
if _, ok := IntrinsicFields[key.Name]; ok {
|
||||
@@ -212,7 +232,6 @@ func (b *logQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldK
|
||||
}
|
||||
|
||||
return querybuilder.AdjustKey(key, keys, nil)
|
||||
|
||||
}
|
||||
|
||||
// buildListQuery builds a query for list panel type
|
||||
@@ -249,11 +268,7 @@ func (b *logQueryStatementBuilder) buildListQuery(
|
||||
sb.SelectMore(LogsV2SeverityNumberColumn)
|
||||
sb.SelectMore(LogsV2ScopeNameColumn)
|
||||
sb.SelectMore(LogsV2ScopeVersionColumn)
|
||||
sb.SelectMore(LogsV2BodyColumn)
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
sb.SelectMore(LogsV2BodyJSONColumn)
|
||||
sb.SelectMore(LogsV2BodyPromotedColumn)
|
||||
}
|
||||
sb.SelectMore(bodyAliasExpression())
|
||||
sb.SelectMore(LogsV2AttributesStringColumn)
|
||||
sb.SelectMore(LogsV2AttributesNumberColumn)
|
||||
sb.SelectMore(LogsV2AttributesBoolColumn)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
|
||||
@@ -886,3 +887,246 @@ func TestAdjustKey(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStmtBuilderBodyField(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
|
||||
enableBodyJSONQuery bool
|
||||
expected qbtypes.Statement
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "body_exists",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "body Exists"},
|
||||
Limit: 10,
|
||||
},
|
||||
enableBodyJSONQuery: true,
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
Warnings: []string{bodySearchDefaultWarning},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "body_exists_disabled",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "body Exists"},
|
||||
Limit: 10,
|
||||
},
|
||||
enableBodyJSONQuery: false,
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "body_empty",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "body == ''"},
|
||||
Limit: 10,
|
||||
},
|
||||
enableBodyJSONQuery: true,
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
Warnings: []string{bodySearchDefaultWarning},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "body_empty_disabled",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "body == ''"},
|
||||
Limit: 10,
|
||||
},
|
||||
enableBodyJSONQuery: false,
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "body_contains",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "body CONTAINS 'error'"},
|
||||
Limit: 10,
|
||||
},
|
||||
enableBodyJSONQuery: true,
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body_v2.message) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "%error%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
Warnings: []string{bodySearchDefaultWarning},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "body_contains_disabled",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "body CONTAINS 'error'"},
|
||||
Limit: 10,
|
||||
},
|
||||
enableBodyJSONQuery: false,
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "%error%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
enable, disable := jsonQueryTestUtil(t)
|
||||
defer disable()
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
if c.enableBodyJSONQuery {
|
||||
enable()
|
||||
} else {
|
||||
disable()
|
||||
}
|
||||
// build the key map after enabling/disabling body JSON query
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
for _, field := range IntrinsicFields {
|
||||
f := field
|
||||
mockMetadataStore.KeysMap[field.Name] = append(mockMetadataStore.KeysMap[field.Name], &f)
|
||||
}
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
statementBuilder := NewLogQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
resourceFilterStmtBuilder,
|
||||
aggExprRewriter,
|
||||
DefaultFullTextColumn,
|
||||
GetBodyJSONKey,
|
||||
)
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), c.expectedErr.Error())
|
||||
} else {
|
||||
if err != nil {
|
||||
_, _, _, _, _, add := errors.Unwrapb(err)
|
||||
t.Logf("error additionals: %v", add)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected.Query, q.Query)
|
||||
require.Equal(t, c.expected.Args, q.Args)
|
||||
require.Equal(t, c.expected.Warnings, q.Warnings)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStmtBuilderBodyFullTextSearch(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
|
||||
enableBodyJSONQuery bool
|
||||
expected qbtypes.Statement
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "body_contains",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "'error'"},
|
||||
Limit: 10,
|
||||
},
|
||||
enableBodyJSONQuery: true,
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND match(LOWER(body_v2.message), LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "error", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "body_contains_disabled",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "'error'"},
|
||||
Limit: 10,
|
||||
},
|
||||
enableBodyJSONQuery: false,
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND match(LOWER(body), LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "error", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
enable, disable := jsonQueryTestUtil(t)
|
||||
defer disable()
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
if c.enableBodyJSONQuery {
|
||||
enable()
|
||||
} else {
|
||||
disable()
|
||||
}
|
||||
// build the key map after enabling/disabling body JSON query
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
for _, field := range IntrinsicFields {
|
||||
f := field
|
||||
mockMetadataStore.KeysMap[field.Name] = append(mockMetadataStore.KeysMap[field.Name], &f)
|
||||
}
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
statementBuilder := NewLogQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
resourceFilterStmtBuilder,
|
||||
aggExprRewriter,
|
||||
DefaultFullTextColumn,
|
||||
GetBodyJSONKey,
|
||||
)
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), c.expectedErr.Error())
|
||||
} else {
|
||||
if err != nil {
|
||||
_, _, _, _, _, add := errors.Unwrapb(err)
|
||||
t.Logf("error additionals: %v", add)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected.Query, q.Query)
|
||||
require.Equal(t, c.expected.Args, q.Args)
|
||||
require.Equal(t, c.expected.Warnings, q.Warnings)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,13 +27,6 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
"body": {
|
||||
{
|
||||
Name: "body",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
"http.status_code": {
|
||||
{
|
||||
Name: "http.status_code",
|
||||
@@ -938,6 +931,13 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
Materialized: true,
|
||||
},
|
||||
},
|
||||
"body": {
|
||||
{
|
||||
Name: "body",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, keys := range keysMap {
|
||||
@@ -945,6 +945,7 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
key.Signal = telemetrytypes.SignalLogs
|
||||
}
|
||||
}
|
||||
|
||||
return keysMap
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ func (t *telemetryMetaStore) fetchBodyJSONPaths(ctx context.Context,
|
||||
instrumentationtypes.CodeNamespace: "metadata",
|
||||
instrumentationtypes.CodeFunctionName: "fetchBodyJSONPaths",
|
||||
})
|
||||
|
||||
query, args, limit := buildGetBodyJSONPathsQuery(fieldKeySelectors)
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
@@ -184,7 +185,6 @@ func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySele
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
sb.Where(sb.Or(orClauses...))
|
||||
|
||||
// Group by path to get unique paths with aggregated types
|
||||
sb.GroupBy("path")
|
||||
|
||||
@@ -319,7 +319,7 @@ func (t *telemetryMetaStore) ListJSONValues(ctx context.Context, path string, li
|
||||
if promoted {
|
||||
path = telemetrylogs.BodyPromotedColumnPrefix + path
|
||||
} else {
|
||||
path = telemetrylogs.BodyJSONColumnPrefix + path
|
||||
path = telemetrylogs.BodyV2ColumnPrefix + path
|
||||
}
|
||||
|
||||
from := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
|
||||
@@ -522,7 +522,7 @@ func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...stri
|
||||
// TODO(Piyush): Remove this function
|
||||
func CleanPathPrefixes(path string) string {
|
||||
path = strings.TrimPrefix(path, telemetrytypes.BodyJSONStringSearchPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyV2ColumnPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyPromotedColumnPrefix)
|
||||
return path
|
||||
}
|
||||
|
||||
@@ -102,7 +102,7 @@ func NewTelemetryMetaStore(
|
||||
jsonColumnMetadata: map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata{
|
||||
telemetrytypes.SignalLogs: {
|
||||
telemetrytypes.FieldContextBody: telemetrytypes.JSONColumnMetadata{
|
||||
BaseColumn: telemetrylogs.LogsV2BodyJSONColumn,
|
||||
BaseColumn: telemetrylogs.LogsV2BodyV2Column,
|
||||
PromotedColumn: telemetrylogs.LogsV2BodyPromotedColumn,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -6,6 +6,7 @@ var (
|
||||
IdentNProviderTokenizer = IdentNProvider{valuer.NewString("tokenizer")}
|
||||
IdentNProviderAPIkey = IdentNProvider{valuer.NewString("api_key")}
|
||||
IdentNProviderAnonymous = IdentNProvider{valuer.NewString("anonymous")}
|
||||
IdentNProviderInternal = IdentNProvider{valuer.NewString("internal")}
|
||||
)
|
||||
|
||||
type IdentNProvider struct{ valuer.String }
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/open-telemetry/opamp-go/protobufs"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -18,15 +17,6 @@ const (
|
||||
AgentStatusDisconnected
|
||||
)
|
||||
|
||||
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
|
||||
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
|
||||
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
|
||||
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
}
|
||||
|
||||
type StorableAgent struct {
|
||||
bun.BaseModel `bun:"table:agent"`
|
||||
|
||||
@@ -40,6 +30,16 @@ type StorableAgent struct {
|
||||
Config string `bun:"config,type:text,notnull"`
|
||||
}
|
||||
|
||||
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
|
||||
return StorableAgent{
|
||||
OrgID: orgID,
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
AgentID: agentID,
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
type ElementType struct{ valuer.String }
|
||||
|
||||
var (
|
||||
@@ -49,6 +49,24 @@ var (
|
||||
ElementTypeLbExporter = ElementType{valuer.NewString("lb_exporter")}
|
||||
)
|
||||
|
||||
// NewElementType creates a new ElementType from a string value.
|
||||
// Returns the corresponding ElementType constant if the string matches,
|
||||
// otherwise returns an empty ElementType.
|
||||
func NewElementType(value string) ElementType {
|
||||
switch valuer.NewString(value) {
|
||||
case ElementTypeSamplingRules.String:
|
||||
return ElementTypeSamplingRules
|
||||
case ElementTypeDropRules.String:
|
||||
return ElementTypeDropRules
|
||||
case ElementTypeLogPipelines.String:
|
||||
return ElementTypeLogPipelines
|
||||
case ElementTypeLbExporter.String:
|
||||
return ElementTypeLbExporter
|
||||
default:
|
||||
return ElementType{valuer.NewString("")}
|
||||
}
|
||||
}
|
||||
|
||||
type DeployStatus struct{ valuer.String }
|
||||
|
||||
var (
|
||||
@@ -80,26 +98,6 @@ type AgentConfigVersion struct {
|
||||
Config string `json:"config" bun:"config,type:text"`
|
||||
}
|
||||
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_element"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
|
||||
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
}
|
||||
|
||||
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
|
||||
return StorableAgent{
|
||||
OrgID: orgID,
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
AgentID: agentID,
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
func NewAgentConfigVersion(orgId valuer.UUID, userId valuer.UUID, elementType ElementType) *AgentConfigVersion {
|
||||
return &AgentConfigVersion{
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
@@ -120,20 +118,12 @@ func (a *AgentConfigVersion) IncrementVersion(lastVersion int) {
|
||||
a.Version = lastVersion + 1
|
||||
}
|
||||
|
||||
// NewElementType creates a new ElementType from a string value.
|
||||
// Returns the corresponding ElementType constant if the string matches,
|
||||
// otherwise returns an empty ElementType.
|
||||
func NewElementType(value string) ElementType {
|
||||
switch valuer.NewString(value) {
|
||||
case ElementTypeSamplingRules.String:
|
||||
return ElementTypeSamplingRules
|
||||
case ElementTypeDropRules.String:
|
||||
return ElementTypeDropRules
|
||||
case ElementTypeLogPipelines.String:
|
||||
return ElementTypeLogPipelines
|
||||
case ElementTypeLbExporter.String:
|
||||
return ElementTypeLbExporter
|
||||
default:
|
||||
return ElementType{valuer.NewString("")}
|
||||
}
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_element"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
|
||||
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ type JSONAccessNode struct {
|
||||
// Node information
|
||||
Name string
|
||||
IsTerminal bool
|
||||
isRoot bool // marked true for only body_json and body_json_promoted
|
||||
isRoot bool // marked true for only body_v2 and body_promoted
|
||||
|
||||
// Precomputed type information (single source of truth)
|
||||
AvailableTypes []JSONDataType
|
||||
|
||||
@@ -1,12 +1,19 @@
|
||||
package telemetrytypes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
otelconstants "github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const (
|
||||
bodyV2Column = otelconstants.BodyV2Column
|
||||
bodyPromotedColumn = otelconstants.BodyPromotedColumn
|
||||
)
|
||||
|
||||
// ============================================================================
|
||||
// Helper Functions for Test Data Creation
|
||||
// ============================================================================
|
||||
@@ -109,8 +116,8 @@ func TestNode_Alias(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
name: "Root node returns name as-is",
|
||||
node: NewRootJSONAccessNode("body_json", 32, 0),
|
||||
expected: "body_json",
|
||||
node: NewRootJSONAccessNode(bodyV2Column, 32, 0),
|
||||
expected: bodyV2Column,
|
||||
},
|
||||
{
|
||||
name: "Node without parent returns backticked name",
|
||||
@@ -124,9 +131,9 @@ func TestNode_Alias(t *testing.T) {
|
||||
name: "Node with root parent uses dot separator",
|
||||
node: &JSONAccessNode{
|
||||
Name: "age",
|
||||
Parent: NewRootJSONAccessNode("body_json", 32, 0),
|
||||
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
|
||||
},
|
||||
expected: "`" + "body_json" + ".age`",
|
||||
expected: "`" + bodyV2Column + ".age`",
|
||||
},
|
||||
{
|
||||
name: "Node with non-root parent uses array separator",
|
||||
@@ -134,10 +141,10 @@ func TestNode_Alias(t *testing.T) {
|
||||
Name: "name",
|
||||
Parent: &JSONAccessNode{
|
||||
Name: "education",
|
||||
Parent: NewRootJSONAccessNode("body_json", 32, 0),
|
||||
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
|
||||
},
|
||||
},
|
||||
expected: "`" + "body_json" + ".education[].name`",
|
||||
expected: "`" + bodyV2Column + ".education[].name`",
|
||||
},
|
||||
{
|
||||
name: "Nested array path with multiple levels",
|
||||
@@ -147,11 +154,11 @@ func TestNode_Alias(t *testing.T) {
|
||||
Name: "awards",
|
||||
Parent: &JSONAccessNode{
|
||||
Name: "education",
|
||||
Parent: NewRootJSONAccessNode("body_json", 32, 0),
|
||||
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: "`" + "body_json" + ".education[].awards[].type`",
|
||||
expected: "`" + bodyV2Column + ".education[].awards[].type`",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -173,18 +180,18 @@ func TestNode_FieldPath(t *testing.T) {
|
||||
name: "Simple field path from root",
|
||||
node: &JSONAccessNode{
|
||||
Name: "user",
|
||||
Parent: NewRootJSONAccessNode("body_json", 32, 0),
|
||||
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
|
||||
},
|
||||
// FieldPath() always wraps the field name in backticks
|
||||
expected: "body_json" + ".`user`",
|
||||
expected: bodyV2Column + ".`user`",
|
||||
},
|
||||
{
|
||||
name: "Field path with backtick-required key",
|
||||
node: &JSONAccessNode{
|
||||
Name: "user-name", // requires backtick
|
||||
Parent: NewRootJSONAccessNode("body_json", 32, 0),
|
||||
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
|
||||
},
|
||||
expected: "body_json" + ".`user-name`",
|
||||
expected: bodyV2Column + ".`user-name`",
|
||||
},
|
||||
{
|
||||
name: "Nested field path",
|
||||
@@ -192,11 +199,11 @@ func TestNode_FieldPath(t *testing.T) {
|
||||
Name: "age",
|
||||
Parent: &JSONAccessNode{
|
||||
Name: "user",
|
||||
Parent: NewRootJSONAccessNode("body_json", 32, 0),
|
||||
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
|
||||
},
|
||||
},
|
||||
// FieldPath() always wraps the field name in backticks
|
||||
expected: "`" + "body_json" + ".user`.`age`",
|
||||
expected: "`" + bodyV2Column + ".user`.`age`",
|
||||
},
|
||||
{
|
||||
name: "Array element field path",
|
||||
@@ -204,11 +211,11 @@ func TestNode_FieldPath(t *testing.T) {
|
||||
Name: "name",
|
||||
Parent: &JSONAccessNode{
|
||||
Name: "education",
|
||||
Parent: NewRootJSONAccessNode("body_json", 32, 0),
|
||||
Parent: NewRootJSONAccessNode(bodyV2Column, 32, 0),
|
||||
},
|
||||
},
|
||||
// FieldPath() always wraps the field name in backticks
|
||||
expected: "`" + "body_json" + ".education`.`name`",
|
||||
expected: "`" + bodyV2Column + ".education`.`name`",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -236,36 +243,36 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
|
||||
{
|
||||
name: "Simple path not promoted",
|
||||
key: makeKey("user.name", String, false),
|
||||
expectedYAML: `
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: user.name
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- String
|
||||
maxDynamicTypes: 16
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`,
|
||||
`, bodyV2Column),
|
||||
},
|
||||
{
|
||||
name: "Simple path promoted",
|
||||
key: makeKey("user.name", String, true),
|
||||
expectedYAML: `
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: user.name
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- String
|
||||
maxDynamicTypes: 16
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
- name: user.name
|
||||
column: body_json_promoted
|
||||
column: %s
|
||||
availableTypes:
|
||||
- String
|
||||
maxDynamicTypes: 16
|
||||
maxDynamicPaths: 256
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`,
|
||||
`, bodyV2Column, bodyPromotedColumn),
|
||||
},
|
||||
{
|
||||
name: "Empty path returns error",
|
||||
@@ -278,8 +285,8 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: "body_json",
|
||||
PromotedColumn: "body_json_promoted",
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
if tt.expectErr {
|
||||
require.Error(t, err)
|
||||
@@ -304,9 +311,9 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
|
||||
{
|
||||
name: "Single array level - JSON branch only",
|
||||
path: "education[].name",
|
||||
expectedYAML: `
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: education
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -318,14 +325,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
|
||||
maxDynamicTypes: 8
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`,
|
||||
`, bodyV2Column),
|
||||
},
|
||||
{
|
||||
name: "Single array level - both JSON and Dynamic branches",
|
||||
path: "education[].awards[].type",
|
||||
expectedYAML: `
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: education
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -352,14 +359,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
|
||||
maxDynamicPaths: 256
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`,
|
||||
`, bodyV2Column),
|
||||
},
|
||||
{
|
||||
name: "Deeply nested array path",
|
||||
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
|
||||
expectedYAML: `
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: interests
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -399,14 +406,14 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
|
||||
- String
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`,
|
||||
`, bodyV2Column),
|
||||
},
|
||||
{
|
||||
name: "ArrayAnyIndex replacement [*] to []",
|
||||
path: "education[*].name",
|
||||
expectedYAML: `
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: education
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -418,7 +425,7 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
|
||||
maxDynamicTypes: 8
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`,
|
||||
`, bodyV2Column),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -426,8 +433,8 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
key := makeKey(tt.path, String, false)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: "body_json",
|
||||
PromotedColumn: "body_json_promoted",
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, key.JSONPlan)
|
||||
@@ -445,15 +452,15 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
|
||||
t.Run("Non-promoted plan", func(t *testing.T) {
|
||||
key := makeKey(path, String, false)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: "body_json",
|
||||
PromotedColumn: "body_json_promoted",
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, key.JSONPlan, 1)
|
||||
|
||||
expectedYAML := `
|
||||
expectedYAML := fmt.Sprintf(`
|
||||
- name: education
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -480,7 +487,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
|
||||
maxDynamicPaths: 256
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`
|
||||
`, bodyV2Column)
|
||||
got := plansToYAML(t, key.JSONPlan)
|
||||
require.YAMLEq(t, expectedYAML, got)
|
||||
})
|
||||
@@ -488,15 +495,15 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
|
||||
t.Run("Promoted plan", func(t *testing.T) {
|
||||
key := makeKey(path, String, true)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: "body_json",
|
||||
PromotedColumn: "body_json_promoted",
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, key.JSONPlan, 2)
|
||||
|
||||
expectedYAML := `
|
||||
expectedYAML := fmt.Sprintf(`
|
||||
- name: education
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -524,7 +531,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
- name: education
|
||||
column: body_json_promoted
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -554,7 +561,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
|
||||
maxDynamicPaths: 256
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`
|
||||
`, bodyV2Column, bodyPromotedColumn)
|
||||
got := plansToYAML(t, key.JSONPlan)
|
||||
require.YAMLEq(t, expectedYAML, got)
|
||||
})
|
||||
@@ -575,11 +582,11 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
name: "Very deep nesting - validates progression doesn't go negative",
|
||||
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
|
||||
expectedYAML: `
|
||||
name: "Very deep nesting - validates progression doesn't go negative",
|
||||
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: interests
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -619,14 +626,14 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
|
||||
- String
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`,
|
||||
`, bodyV2Column),
|
||||
},
|
||||
{
|
||||
name: "Path with mixed scalar and array types",
|
||||
path: "education[].type",
|
||||
expectedYAML: `
|
||||
name: "Path with mixed scalar and array types",
|
||||
path: "education[].type",
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: education
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -639,20 +646,20 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
|
||||
maxDynamicTypes: 8
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`,
|
||||
`, bodyV2Column),
|
||||
},
|
||||
{
|
||||
name: "Exists with only array types available",
|
||||
path: "education",
|
||||
expectedYAML: `
|
||||
name: "Exists with only array types available",
|
||||
path: "education",
|
||||
expectedYAML: fmt.Sprintf(`
|
||||
- name: education
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
isTerminal: true
|
||||
elemType: Array(JSON)
|
||||
`,
|
||||
`, bodyV2Column),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -668,8 +675,8 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
|
||||
}
|
||||
key := makeKey(tt.path, keyType, false)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: "body_json",
|
||||
PromotedColumn: "body_json_promoted",
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
if tt.expectErr {
|
||||
require.Error(t, err)
|
||||
@@ -687,15 +694,15 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
|
||||
path := "education[].awards[].participated[].team[].branch"
|
||||
key := makeKey(path, String, false)
|
||||
err := key.SetJSONAccessPlan(JSONColumnMetadata{
|
||||
BaseColumn: "body_json",
|
||||
PromotedColumn: "body_json_promoted",
|
||||
BaseColumn: bodyV2Column,
|
||||
PromotedColumn: bodyPromotedColumn,
|
||||
}, types)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, key.JSONPlan, 1)
|
||||
|
||||
expectedYAML := `
|
||||
expectedYAML := fmt.Sprintf(`
|
||||
- name: education
|
||||
column: body_json
|
||||
column: %s
|
||||
availableTypes:
|
||||
- Array(JSON)
|
||||
maxDynamicTypes: 16
|
||||
@@ -780,7 +787,7 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
|
||||
maxDynamicPaths: 64
|
||||
isTerminal: true
|
||||
elemType: String
|
||||
`
|
||||
`, bodyV2Column)
|
||||
|
||||
got := plansToYAML(t, key.JSONPlan)
|
||||
require.YAMLEq(t, expectedYAML, got)
|
||||
|
||||
@@ -20,9 +20,11 @@ type MockMetadataStore struct {
|
||||
PromotedPathsMap map[string]bool
|
||||
LogsJSONIndexesMap map[string][]schemamigrator.Index
|
||||
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
|
||||
// StaticFields holds signal-specific intrinsic field definitions (e.g. telemetrylogs.IntrinsicFields).
|
||||
StaticFields map[string]telemetrytypes.TelemetryFieldKey
|
||||
}
|
||||
|
||||
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps
|
||||
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps.
|
||||
func NewMockMetadataStore() *MockMetadataStore {
|
||||
return &MockMetadataStore{
|
||||
KeysMap: make(map[string][]*telemetrytypes.TelemetryFieldKey),
|
||||
@@ -33,12 +35,20 @@ func NewMockMetadataStore() *MockMetadataStore {
|
||||
PromotedPathsMap: make(map[string]bool),
|
||||
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
|
||||
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
StaticFields: make(map[string]telemetrytypes.TelemetryFieldKey),
|
||||
}
|
||||
}
|
||||
|
||||
// SetStaticFields sets the static fields for the mock metadata store.
|
||||
// Pass the signal-specific intrinsic fields (e.g. telemetrylogs.IntrinsicFields) so the mock
|
||||
// mirrors what the real metadata store does when injecting those definitions into key results.
|
||||
func (m *MockMetadataStore) SetStaticFields(intrinsicFields map[string]telemetrytypes.TelemetryFieldKey) {
|
||||
m.StaticFields = intrinsicFields
|
||||
}
|
||||
|
||||
// GetKeys returns a map of field keys types.TelemetryFieldKey by name
|
||||
func (m *MockMetadataStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) {
|
||||
|
||||
setOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
|
||||
result := make(map[string][]*telemetrytypes.TelemetryFieldKey)
|
||||
|
||||
// If selector is nil, return all keys
|
||||
@@ -46,18 +56,30 @@ func (m *MockMetadataStore) GetKeys(ctx context.Context, fieldKeySelector *telem
|
||||
return m.KeysMap, true, nil
|
||||
}
|
||||
|
||||
// Apply selector logic
|
||||
// Apply selector logic from KeysMap
|
||||
for name, keys := range m.KeysMap {
|
||||
// Check if name matches
|
||||
if matchesName(fieldKeySelector, name) {
|
||||
filteredKeys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
for _, key := range keys {
|
||||
if matchesKey(fieldKeySelector, key) {
|
||||
filteredKeys = append(filteredKeys, key)
|
||||
if _, exists := setOfKeys[key.Text()]; !exists {
|
||||
result[name] = append(result[name], key)
|
||||
setOfKeys[key.Text()] = key
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(filteredKeys) > 0 {
|
||||
result[name] = filteredKeys
|
||||
}
|
||||
}
|
||||
|
||||
// StaticFields (e.g. IntrinsicFields), mirroring the real metadata store.
|
||||
for key, field := range m.StaticFields {
|
||||
if !matchesName(fieldKeySelector, key) {
|
||||
continue
|
||||
}
|
||||
|
||||
if matchesKey(fieldKeySelector, &field) {
|
||||
if _, exists := setOfKeys[field.Text()]; !exists {
|
||||
result[field.Name] = append(result[field.Name], &field)
|
||||
setOfKeys[field.Text()] = &field
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -108,7 +130,7 @@ func (m *MockMetadataStore) GetKey(ctx context.Context, fieldKeySelector *teleme
|
||||
|
||||
result := []*telemetrytypes.TelemetryFieldKey{}
|
||||
|
||||
// Find keys matching the selector
|
||||
// Find keys matching the selector from KeysMap
|
||||
for name, keys := range m.KeysMap {
|
||||
if matchesName(fieldKeySelector, name) {
|
||||
for _, key := range keys {
|
||||
@@ -119,6 +141,17 @@ func (m *MockMetadataStore) GetKey(ctx context.Context, fieldKeySelector *teleme
|
||||
}
|
||||
}
|
||||
|
||||
// Add matching StaticFields (e.g. IntrinsicFields), same as the real metadata store does
|
||||
for key, field := range m.StaticFields {
|
||||
if !matchesName(fieldKeySelector, key) {
|
||||
continue
|
||||
}
|
||||
|
||||
if matchesKey(fieldKeySelector, &field) {
|
||||
result = append(result, &field)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -61,3 +61,55 @@ def test_api_key(signoz: types.SigNoz, get_token: Callable[[str, str], str]) ->
|
||||
assert found_pat["userId"] == found_user["id"]
|
||||
assert found_pat["name"] == "admin"
|
||||
assert found_pat["role"] == "ADMIN"
|
||||
|
||||
|
||||
def test_api_key_role(signoz: types.SigNoz, get_token: Callable[[str, str], str]) -> None:
|
||||
admin_token = get_token("admin@integration.test", "password123Z$")
|
||||
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/pats"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={
|
||||
"name": "viewer",
|
||||
"role": "VIEWER",
|
||||
"expiresInDays": 1,
|
||||
},
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.CREATED
|
||||
pat_response = response.json()
|
||||
assert "data" in pat_response
|
||||
assert "token" in pat_response["data"]
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/pats"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={
|
||||
"name": "editor",
|
||||
"role": "EDITOR",
|
||||
"expiresInDays": 1,
|
||||
},
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.CREATED
|
||||
pat_response = response.json()
|
||||
assert "data" in pat_response
|
||||
assert "token" in pat_response["data"]
|
||||
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"SIGNOZ-API-KEY": f"{pat_response["data"]["token"]}"},
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
Reference in New Issue
Block a user