Compare commits

..

2 Commits

Author SHA1 Message Date
nityanandagohain
e1075e6b84 fix: add changes to qb 2025-04-11 20:13:00 +05:30
nityanandagohain
93b7f40a24 chore: handle excape characters properly 2025-04-11 19:59:53 +05:30
77 changed files with 561 additions and 2607 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/url"
"strings"
"time"
"github.com/SigNoz/signoz/ee/query-service/constants"
@@ -161,7 +162,12 @@ func (m *modelDao) PrecheckLogin(ctx context.Context, email, sourceUrl string) (
// find domain from email
orgDomain, apierr := m.GetDomainByEmail(ctx, email)
if apierr != nil {
zap.L().Error("failed to get org domain from email", zap.String("email", email), zap.Error(apierr.ToError()))
var emailDomain string
emailComponents := strings.Split(email, "@")
if len(emailComponents) > 0 {
emailDomain = emailComponents[1]
}
zap.L().Error("failed to get org domain from email", zap.String("emailDomain", emailDomain), zap.Error(apierr.ToError()))
return resp, apierr
}

View File

@@ -406,7 +406,9 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
&filterSet,
[]v3.AttributeKey{},
v3.AttributeKey{},
false)
false,
false,
)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return "", err
@@ -3756,7 +3758,7 @@ func (r *ClickHouseReader) GetLatestReceivedMetric(
quotedMetricNames := []string{}
for _, m := range metricNames {
quotedMetricNames = append(quotedMetricNames, utils.ClickHouseFormattedValue(m))
quotedMetricNames = append(quotedMetricNames, utils.ClickHouseFormattedValue(m, false))
}
commaSeparatedMetricNames := strings.Join(quotedMetricNames, ", ")
@@ -4015,16 +4017,16 @@ func (r *ClickHouseReader) FetchRelatedValues(ctx context.Context, req *v3.Filte
}
switch v := item.Value.(type) {
case string:
fmtVal := utils.ClickHouseFormattedValue(v)
fmtVal := utils.ClickHouseFormattedValue(v, false)
addCondition(fmtVal)
case []string:
for _, val := range v {
fmtVal := utils.ClickHouseFormattedValue(val)
fmtVal := utils.ClickHouseFormattedValue(val, false)
addCondition(fmtVal)
}
case []interface{}:
for _, val := range v {
fmtVal := utils.ClickHouseFormattedValue(val)
fmtVal := utils.ClickHouseFormattedValue(val, false)
addCondition(fmtVal)
}
}
@@ -5121,7 +5123,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
toFormat = fmt.Sprintf("%%%s%%", toFormat)
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)
fmtVal := utils.ClickHouseFormattedValue(toFormat, false)
switch op {
case v3.FilterOperatorEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))

View File

@@ -1,4 +1,4 @@
package services
package cloudintegrations
import (
"bytes"
@@ -15,11 +15,11 @@ import (
"golang.org/x/exp/maps"
)
func List(
func listCloudProviderServices(
cloudProvider string,
) ([]Definition, *model.ApiError) {
cloudServices, found := supportedServices[cloudProvider]
if !found || cloudServices == nil {
) ([]CloudServiceDetails, *model.ApiError) {
cloudServices := availableServices[cloudProvider]
if cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider,
))
@@ -33,21 +33,10 @@ func List(
return services, nil
}
func Map(cloudprovider string) (map[string]Definition, *model.ApiError) {
cloudServices, found := supportedServices[cloudprovider]
if !found || cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudprovider,
))
}
return cloudServices, nil
}
func GetServiceDefinition(
func getCloudProviderService(
cloudProvider string, serviceId string,
) (*Definition, *model.ApiError) {
cloudServices := supportedServices[cloudProvider]
) (*CloudServiceDetails, *model.ApiError) {
cloudServices := availableServices[cloudProvider]
if cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider,
@@ -68,7 +57,7 @@ func GetServiceDefinition(
// Service details read from ./serviceDefinitions
// { "providerName": { "service_id": {...}} }
var supportedServices map[string]map[string]Definition
var availableServices map[string]map[string]CloudServiceDetails
func init() {
err := readAllServiceDefinitions()
@@ -79,15 +68,15 @@ func init() {
}
}
//go:embed definitions/*
var definitionFiles embed.FS
//go:embed serviceDefinitions/*
var serviceDefinitionFiles embed.FS
func readAllServiceDefinitions() error {
supportedServices = map[string]map[string]Definition{}
availableServices = map[string]map[string]CloudServiceDetails{}
rootDirName := "definitions"
rootDirName := "serviceDefinitions"
cloudProviderDirs, err := fs.ReadDir(definitionFiles, rootDirName)
cloudProviderDirs, err := fs.ReadDir(serviceDefinitionFiles, rootDirName)
if err != nil {
return fmt.Errorf("couldn't read dirs in %s: %w", rootDirName, err)
}
@@ -109,21 +98,21 @@ func readAllServiceDefinitions() error {
return fmt.Errorf("no %s services could be read", cloudProvider)
}
supportedServices[cloudProvider] = cloudServices
availableServices[cloudProvider] = cloudServices
}
return nil
}
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]Definition, error,
map[string]CloudServiceDetails, error,
) {
svcDefDirs, err := fs.ReadDir(definitionFiles, cloudProviderDirPath)
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't list integrations dirs: %w", err)
}
svcDefs := map[string]Definition{}
svcDefs := map[string]CloudServiceDetails{}
for _, d := range svcDefDirs {
if !d.IsDir() {
@@ -148,10 +137,10 @@ func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath st
return svcDefs, nil
}
func readServiceDefinition(cloudProvider string, svcDirpath string) (*Definition, error) {
func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json")
serializedSpec, err := definitionFiles.ReadFile(integrationJsonPath)
serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
@@ -168,7 +157,7 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*Definition
}
hydrated, err := integrations.HydrateFileUris(
integrationSpec, definitionFiles, svcDirpath,
integrationSpec, serviceDefinitionFiles, svcDirpath,
)
if err != nil {
return nil, fmt.Errorf(
@@ -178,7 +167,7 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*Definition
}
hydratedSpec := hydrated.(map[string]any)
serviceDef, err := ParseStructWithJsonTagsFromMap[Definition](hydratedSpec)
serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
@@ -191,13 +180,13 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*Definition
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}
serviceDef.Strategy.Provider = cloudProvider
serviceDef.TelemetryCollectionStrategy.Provider = cloudProvider
return serviceDef, nil
}
func validateServiceDefinition(s *Definition) error {
func validateServiceDefinition(s *CloudServiceDetails) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
@@ -207,7 +196,7 @@ func validateServiceDefinition(s *Definition) error {
seenDashboardIds[dd.Id] = nil
}
if s.Strategy == nil {
if s.TelemetryCollectionStrategy == nil {
return fmt.Errorf("telemetry_collection_strategy is required")
}

View File

@@ -1,4 +1,4 @@
package services
package cloudintegrations
import (
"testing"
@@ -11,22 +11,22 @@ func TestAvailableServices(t *testing.T) {
require := require.New(t)
// should be able to list available services.
_, apiErr := List("bad-cloud-provider")
_, apiErr := listCloudProviderServices("bad-cloud-provider")
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
awsSvcs, apiErr := List("aws")
awsSvcs, apiErr := listCloudProviderServices("aws")
require.Nil(apiErr)
require.Greater(len(awsSvcs), 0)
// should be able to get details of a service
_, apiErr = GetServiceDefinition(
_, apiErr = getCloudProviderService(
"aws", "bad-service-id",
)
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
svc, apiErr := GetServiceDefinition(
svc, apiErr := getCloudProviderService(
"aws", awsSvcs[0].Id,
)
require.Nil(apiErr)

View File

@@ -8,7 +8,6 @@ import (
"strings"
"time"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
@@ -28,7 +27,7 @@ func validateCloudProviderName(name string) *model.ApiError {
type Controller struct {
accountsRepo cloudProviderAccountsRepository
serviceConfigRepo ServiceConfigDatabase
serviceConfigRepo serviceConfigRepository
}
func NewController(sqlStore sqlstore.SQLStore) (
@@ -201,7 +200,7 @@ type AgentCheckInResponse struct {
type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"`
TelemetryCollectionStrategy *CompiledCollectionStrategy `json:"telemetry,omitempty"`
TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"`
}
func (c *Controller) CheckInAsAgent(
@@ -240,7 +239,7 @@ func (c *Controller) CheckInAsAgent(
}
// prepare and return integration config to be consumed by agent
compliedStrategy, err := NewCompiledCollectionStrategy(cloudProvider)
telemetryCollectionStrategy, err := NewCloudTelemetryCollectionStrategy(cloudProvider)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't init telemetry collection strategy: %w", err,
@@ -249,17 +248,21 @@ func (c *Controller) CheckInAsAgent(
agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{},
TelemetryCollectionStrategy: compliedStrategy,
TelemetryCollectionStrategy: telemetryCollectionStrategy,
}
if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions
}
services, apiErr := services.Map(cloudProvider)
services, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcDetailsById := map[string]*CloudServiceDetails{}
for _, svcDetails := range services {
svcDetailsById[svcDetails.Id] = &svcDetails
}
svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, *account.CloudAccountId,
@@ -275,16 +278,22 @@ func (c *Controller) CheckInAsAgent(
slices.Sort(configuredSvcIds)
for _, svcId := range configuredSvcIds {
definition, ok := services[svcId]
if !ok {
continue
}
config := svcConfigs[svcId]
svcDetails := svcDetailsById[svcId]
svcConfig := svcConfigs[svcId]
err := AddServiceStrategy(compliedStrategy, definition.Strategy, config)
if err != nil {
return nil, model.InternalError(
fmt.Errorf("couldn't add service telemetry collection strategy: %s", err))
if svcDetails != nil {
metricsEnabled := svcConfig.Metrics != nil && svcConfig.Metrics.Enabled
logsEnabled := svcConfig.Logs != nil && svcConfig.Logs.Enabled
if logsEnabled || metricsEnabled {
err := agentConfig.TelemetryCollectionStrategy.AddServiceStrategy(
svcDetails.TelemetryCollectionStrategy, logsEnabled, metricsEnabled,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't add service telemetry collection strategy: %w", err,
))
}
}
}
}
@@ -346,7 +355,7 @@ func (c *Controller) DisconnectAccount(
}
type ListServicesResponse struct {
Services []ServiceSummary `json:"services"`
Services []CloudServiceSummary `json:"services"`
}
func (c *Controller) ListServices(
@@ -354,16 +363,17 @@ func (c *Controller) ListServices(
cloudProvider string,
cloudAccountId *string,
) (*ListServicesResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
definitions, apiErr := services.List(cloudProvider)
services, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcConfigs := map[string]*ServiceConfig{}
svcConfigs := map[string]*CloudServiceConfig{}
if cloudAccountId != nil {
svcConfigs, apiErr = c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, *cloudAccountId,
@@ -375,11 +385,9 @@ func (c *Controller) ListServices(
}
}
summaries := []ServiceSummary{}
for _, def := range definitions {
summary := ServiceSummary{
Metadata: def.Metadata,
}
summaries := []CloudServiceSummary{}
for _, s := range services {
summary := s.CloudServiceSummary
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
@@ -395,21 +403,17 @@ func (c *Controller) GetServiceDetails(
cloudProvider string,
serviceId string,
cloudAccountId *string,
) (*ServiceDetails, *model.ApiError) {
) (*CloudServiceDetails, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
definition, apiErr := services.GetServiceDefinition(cloudProvider, serviceId)
service, apiErr := getCloudProviderService(cloudProvider, serviceId)
if apiErr != nil {
return nil, apiErr
}
details := ServiceDetails{
Definition: *definition,
}
if cloudAccountId != nil {
config, apiErr := c.serviceConfigRepo.get(
ctx, cloudProvider, *cloudAccountId, serviceId,
@@ -419,15 +423,15 @@ func (c *Controller) GetServiceDetails(
}
if config != nil {
details.Config = config
service.Config = config
if config.Metrics != nil && config.Metrics.Enabled {
// add links to service dashboards, making them clickable.
for i, d := range details.Assets.Dashboards {
for i, d := range service.Assets.Dashboards {
dashboardUuid := c.dashboardUuid(
cloudProvider, serviceId, d.Id,
)
details.Assets.Dashboards[i].Url = fmt.Sprintf(
service.Assets.Dashboards[i].Url = fmt.Sprintf(
"/dashboard/%s", dashboardUuid,
)
}
@@ -435,17 +439,17 @@ func (c *Controller) GetServiceDetails(
}
}
return &details, nil
return service, nil
}
type UpdateServiceConfigRequest struct {
CloudAccountId string `json:"cloud_account_id"`
Config ServiceConfig `json:"config"`
CloudAccountId string `json:"cloud_account_id"`
Config CloudServiceConfig `json:"config"`
}
type UpdateServiceConfigResponse struct {
Id string `json:"id"`
Config ServiceConfig `json:"config"`
Id string `json:"id"`
Config CloudServiceConfig `json:"config"`
}
func (c *Controller) UpdateServiceConfig(
@@ -454,6 +458,7 @@ func (c *Controller) UpdateServiceConfig(
serviceId string,
req UpdateServiceConfigRequest,
) (*UpdateServiceConfigResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
@@ -467,7 +472,7 @@ func (c *Controller) UpdateServiceConfig(
}
// can only update config for a valid service.
_, apiErr = services.GetServiceDefinition(cloudProvider, serviceId)
_, apiErr = getCloudProviderService(cloudProvider, serviceId)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "unsupported service")
}
@@ -535,7 +540,7 @@ func (c *Controller) AvailableDashboardsForCloudProvider(
}
}
allServices, apiErr := services.List(cloudProvider)
allServices, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, apiErr
}

View File

@@ -182,8 +182,8 @@ func TestConfigureService(t *testing.T) {
require.NotNil(testConnectedAccount.CloudAccountId)
require.Equal(testCloudAccountId, *testConnectedAccount.CloudAccountId)
testSvcConfig := ServiceConfig{
Metrics: &MetricsConfig{
testSvcConfig := CloudServiceConfig{
Metrics: &CloudServiceMetricsConfig{
Enabled: true,
},
}

View File

@@ -6,22 +6,9 @@ import (
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/types"
)
type ServiceSummary struct {
services.Metadata
Config *ServiceConfig `json:"config"`
}
type ServiceDetails struct {
services.Definition
Config *ServiceConfig `json:"config"`
ConnectionStatus *ServiceConnectionStatus `json:"status,omitempty"`
}
// Represents a cloud provider account for cloud integrations
type AccountRecord struct {
CloudProvider string `json:"cloud_provider" db:"cloud_provider"`
@@ -131,13 +118,39 @@ func (a *AccountRecord) account() Account {
return ca
}
type ServiceConfig struct {
Logs *LogsConfig `json:"logs,omitempty"`
Metrics *MetricsConfig `json:"metrics,omitempty"`
type CloudServiceSummary struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
// Present only if the service has been configured in the
// context of a cloud provider account.
Config *CloudServiceConfig `json:"config,omitempty"`
}
// Serialization from db
func (c *ServiceConfig) Scan(src any) error {
type CloudServiceDetails struct {
CloudServiceSummary
Overview string `json:"overview"` // markdown
Assets CloudServiceAssets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollectedForService `json:"data_collected"`
ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"`
TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry_collection_strategy"`
}
type CloudServiceConfig struct {
Logs *CloudServiceLogsConfig `json:"logs,omitempty"`
Metrics *CloudServiceMetricsConfig `json:"metrics,omitempty"`
}
// For serializing from db
func (c *CloudServiceConfig) Scan(src any) error {
data, ok := src.([]byte)
if !ok {
return fmt.Errorf("tried to scan from %T instead of bytes", src)
@@ -146,8 +159,8 @@ func (c *ServiceConfig) Scan(src any) error {
return json.Unmarshal(data, &c)
}
// Serializing to db
func (c *ServiceConfig) Value() (driver.Value, error) {
// For serializing to db
func (c *CloudServiceConfig) Value() (driver.Value, error) {
if c == nil {
return nil, nil
}
@@ -161,16 +174,51 @@ func (c *ServiceConfig) Value() (driver.Value, error) {
return serialized, nil
}
type LogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type MetricsConfig struct {
type CloudServiceLogsConfig struct {
Enabled bool `json:"enabled"`
}
type ServiceConnectionStatus struct {
type CloudServiceMetricsConfig struct {
Enabled bool `json:"enabled"`
}
type CloudServiceAssets struct {
Dashboards []CloudServiceDashboard `json:"dashboards"`
}
type CloudServiceDashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *types.DashboardData `json:"definition,omitempty"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollectedForService struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CloudServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
}
@@ -180,14 +228,23 @@ type SignalConnectionStatus struct {
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}
type CompiledCollectionStrategy = services.CollectionStrategy
type CloudTelemetryCollectionStrategy struct {
Provider string `json:"provider"`
func NewCompiledCollectionStrategy(provider string) (*CompiledCollectionStrategy, error) {
AWSMetrics *AWSMetricsCollectionStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsCollectionStrategy `json:"aws_logs,omitempty"`
}
func NewCloudTelemetryCollectionStrategy(provider string) (*CloudTelemetryCollectionStrategy, error) {
if provider == "aws" {
return &CompiledCollectionStrategy{
Provider: "aws",
AWSMetrics: &services.AWSMetricsStrategy{},
AWSLogs: &services.AWSLogsStrategy{},
return &CloudTelemetryCollectionStrategy{
Provider: "aws",
AWSMetrics: &AWSMetricsCollectionStrategy{
CloudwatchMetricsStreamFilters: []CloudwatchMetricStreamFilter{},
},
AWSLogs: &AWSLogsCollectionStrategy{
CloudwatchLogsSubscriptions: []CloudwatchLogsSubscriptionConfig{},
},
}, nil
}
@@ -195,27 +252,24 @@ func NewCompiledCollectionStrategy(provider string) (*CompiledCollectionStrategy
}
// Helper for accumulating strategies for enabled services.
func AddServiceStrategy(cs *CompiledCollectionStrategy,
definitionStrat *services.CollectionStrategy, config *ServiceConfig) error {
if definitionStrat.Provider != cs.Provider {
func (cs *CloudTelemetryCollectionStrategy) AddServiceStrategy(
svcStrategy *CloudTelemetryCollectionStrategy,
logsEnabled bool,
metricsEnabled bool,
) error {
if svcStrategy.Provider != cs.Provider {
return fmt.Errorf(
"can't add %s service strategy to strategy for %s",
definitionStrat.Provider, cs.Provider,
svcStrategy.Provider, cs.Provider,
)
}
if cs.Provider == "aws" {
if config.Logs != nil && config.Logs.Enabled && definitionStrat.AWSLogs != nil {
cs.AWSLogs.Subscriptions = append(
cs.AWSLogs.Subscriptions,
definitionStrat.AWSLogs.Subscriptions...,
)
if logsEnabled {
cs.AWSLogs.AddServiceStrategy(svcStrategy.AWSLogs)
}
if config.Metrics != nil && config.Metrics.Enabled && definitionStrat.AWSMetrics != nil {
cs.AWSMetrics.StreamFilters = append(
cs.AWSMetrics.StreamFilters,
definitionStrat.AWSMetrics.StreamFilters...,
)
if metricsEnabled {
cs.AWSMetrics.AddServiceStrategy(svcStrategy.AWSMetrics)
}
return nil
}
@@ -223,3 +277,57 @@ func AddServiceStrategy(cs *CompiledCollectionStrategy,
return fmt.Errorf("unsupported cloud provider: %s", cs.Provider)
}
type AWSMetricsCollectionStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
CloudwatchMetricsStreamFilters []CloudwatchMetricStreamFilter `json:"cloudwatch_metric_stream_filters"`
}
type CloudwatchMetricStreamFilter struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
}
func (amc *AWSMetricsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSMetricsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}
amc.CloudwatchMetricsStreamFilters = append(
amc.CloudwatchMetricsStreamFilters,
svcStrategy.CloudwatchMetricsStreamFilters...,
)
return nil
}
type AWSLogsCollectionStrategy struct {
CloudwatchLogsSubscriptions []CloudwatchLogsSubscriptionConfig `json:"cloudwatch_logs_subscriptions"`
}
type CloudwatchLogsSubscriptionConfig struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
}
func (alc *AWSLogsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSLogsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}
alc.CloudwatchLogsSubscriptions = append(
alc.CloudwatchLogsSubscriptions,
svcStrategy.CloudwatchLogsSubscriptions...,
)
return nil
}

View File

@@ -9,28 +9,28 @@ import (
"github.com/jmoiron/sqlx"
)
type ServiceConfigDatabase interface {
type serviceConfigRepository interface {
get(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
serviceId string,
) (*ServiceConfig, *model.ApiError)
) (*CloudServiceConfig, *model.ApiError)
upsert(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
serviceId string,
config ServiceConfig,
) (*ServiceConfig, *model.ApiError)
config CloudServiceConfig,
) (*CloudServiceConfig, *model.ApiError)
getAllForAccount(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
) (
configsBySvcId map[string]*ServiceConfig,
configsBySvcId map[string]*CloudServiceConfig,
apiErr *model.ApiError,
)
}
@@ -52,9 +52,9 @@ func (r *serviceConfigSQLRepository) get(
cloudProvider string,
cloudAccountId string,
serviceId string,
) (*ServiceConfig, *model.ApiError) {
) (*CloudServiceConfig, *model.ApiError) {
var result ServiceConfig
var result CloudServiceConfig
err := r.db.GetContext(
ctx, &result, `
@@ -68,13 +68,17 @@ func (r *serviceConfigSQLRepository) get(
`,
cloudProvider, cloudAccountId, serviceId,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, model.NotFoundError(
fmt.Errorf("couldn't find %s %s config for %s", cloudProvider, serviceId, cloudAccountId))
}
return nil, model.InternalError(fmt.Errorf("couldn't query cloud service config: %w", err))
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find %s %s config for %s",
cloudProvider, serviceId, cloudAccountId,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud service config: %w", err,
))
}
return &result, nil
@@ -86,8 +90,8 @@ func (r *serviceConfigSQLRepository) upsert(
cloudProvider string,
cloudAccountId string,
serviceId string,
config ServiceConfig,
) (*ServiceConfig, *model.ApiError) {
config CloudServiceConfig,
) (*CloudServiceConfig, *model.ApiError) {
query := `
INSERT INTO cloud_integrations_service_configs (
@@ -124,11 +128,11 @@ func (r *serviceConfigSQLRepository) getAllForAccount(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
) (map[string]*ServiceConfig, *model.ApiError) {
) (map[string]*CloudServiceConfig, *model.ApiError) {
type ScannedServiceConfigRecord struct {
ServiceId string `db:"service_id"`
Config ServiceConfig `db:"config_json"`
ServiceId string `db:"service_id"`
Config CloudServiceConfig `db:"config_json"`
}
records := []ScannedServiceConfigRecord{}
@@ -151,7 +155,7 @@ func (r *serviceConfigSQLRepository) getAllForAccount(
))
}
result := map[string]*ServiceConfig{}
result := map[string]*CloudServiceConfig{}
for _, r := range records {
result[r.ServiceId] = &r.Config

View File

@@ -1 +0,0 @@
<svg width="256" height="256" viewBox="0 0 256 256" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" preserveAspectRatio="xMidYMid"><title>AWS Simple Storage Service (S3)</title><defs><linearGradient x1="0%" y1="100%" x2="100%" y2="0%" id="a"><stop stop-color="#1B660F" offset="0%"/><stop stop-color="#6CAE3E" offset="100%"/></linearGradient></defs><g><rect fill="url(#a)" width="256" height="256"/><path d="M194.67488,137.25632 L195.90368,128.60352 C207.23488,135.39072 207.38208,138.19392 207.378964,138.27072 C207.35968,138.28672 205.42688,139.89952 194.67488,137.25632 L194.67488,137.25632 Z M188.45728,135.52832 C168.87328,129.60192 141.59968,117.08992 130.56288,111.87392 C130.56288,111.82912 130.57568,111.78752 130.57568,111.74272 C130.57568,107.50272 127.12608,104.05312 122.88288,104.05312 C118.64608,104.05312 115.19648,107.50272 115.19648,111.74272 C115.19648,115.98272 118.64608,119.43232 122.88288,119.43232 C124.74528,119.43232 126.43488,118.73792 127.76928,117.63392 C140.75488,123.78112 167.81728,136.11072 187.54528,141.93472 L179.74368,196.99392 C179.72128,197.14432 179.71168,197.29472 179.71168,197.44512 C179.71168,202.29312 158.24928,211.19872 123.18048,211.19872 C87.74048,211.19872 66.05088,202.29312 66.05088,197.44512 C66.05088,197.29792 66.04128,197.15392 66.02208,197.00992 L49.72128,77.94752 C63.83008,87.65952 94.17568,92.79872 123.19968,92.79872 C152.17888,92.79872 182.47328,87.67872 196.61088,77.99552 L188.45728,135.52832 Z M47.99968,65.52832 C48.23008,61.31712 72.42848,44.79872 123.19968,44.79872 C173.96448,44.79872 198.16608,61.31392 198.39968,65.52832 L198.39968,66.96512 C195.61568,76.40832 164.25568,86.39872 123.19968,86.39872 C82.07328,86.39872 50.69728,76.37632 47.99968,66.92032 L47.99968,65.52832 Z M204.79968,65.59872 C204.79968,54.51072 173.01088,38.39872 123.19968,38.39872 C73.38848,38.39872 41.59968,54.51072 41.59968,65.59872 L41.90048,68.01152 L59.65408,197.68832 C60.07968,212.19072 98.75488,217.59872 123.18048,217.59872 C153.49088,217.59872 185.69248,210.62912 186.10848,197.69792 L193.77568,143.62752 C198.04128,144.64832 201.55168,145.16992 204.37088,145.16992 C208.15648,145.16992 210.71648,144.24512 212.26848,142.39552 C213.54208,140.87872 214.02848,139.04192 213.66368,137.08672 C212.83488,132.65792 207.57728,127.88352 196.87008,121.77472 L204.47328,68.13632 L204.79968,65.59872 Z" fill="#FFFFFF"/></g></svg>

Before

Width:  |  Height:  |  Size: 2.3 KiB

View File

@@ -1,49 +0,0 @@
{
"id": "s3sync",
"title": "S3 Sync",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"supported_signals": {
"metrics": false,
"logs": true
},
"data_collected": {
"logs": [
{
"name": "Account ID",
"path": "resources.aws.account.id",
"type": "string"
},
{
"name": "Account Region",
"path": "resources.aws.account.region",
"type": "string"
},
{
"name": "Bucket Name",
"path": "resources.aws.bucket.name",
"type": "string"
},
{
"name": "Object Key",
"path": "attributes.aws.object.key",
"type": "string"
},
{
"name": "Object S3 Event Time",
"path": "attributes.aws.object.event_time",
"type": "string"
},
{
"name": "Log line number in source file",
"path": "attributes.log.source.line",
"type": "number"
}
]
},
"telemetry_collection_strategy": {
},
"assets": {
"dashboards": []
}
}

View File

@@ -1,3 +0,0 @@
### Sync logs stored in AWS S3 with SigNoz
Collect logs stored by AWS Services in S3 and explore them in Signoz.

View File

@@ -1,94 +0,0 @@
package services
import (
"github.com/SigNoz/signoz/pkg/types"
)
type Metadata struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
// Present only if the service has been configured in the
// context of a cloud provider account.
// Config *CloudServiceConfig `json:"config,omitempty"`
}
type Definition struct {
Metadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"data_collected"`
Strategy *CollectionStrategy `json:"telemetry_collection_strategy"`
}
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollected struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CollectionStrategy struct {
Provider string `json:"provider"`
AWSMetrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsStrategy `json:"aws_logs,omitempty"`
}
type AWSMetricsStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
StreamFilters []struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
} `json:"cloudwatch_metric_stream_filters"`
}
type AWSLogsStrategy struct {
Subscriptions []struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
} `json:"cloudwatch_logs_subscriptions"`
}
type Dashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *types.DashboardData `json:"definition,omitempty"`
}

View File

@@ -22,7 +22,6 @@ import (
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -3999,8 +3998,8 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
svcDetails *cloudintegrations.ServiceDetails,
) (*cloudintegrations.ServiceConnectionStatus, *model.ApiError) {
svcDetails *cloudintegrations.CloudServiceDetails,
) (*cloudintegrations.CloudServiceConnectionStatus, *model.ApiError) {
if cloudProvider != "aws" {
// TODO(Raj): Make connection check generic for all providers in a follow up change
return nil, model.BadRequest(
@@ -4008,14 +4007,14 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
)
}
telemetryCollectionStrategy := svcDetails.Strategy
telemetryCollectionStrategy := svcDetails.TelemetryCollectionStrategy
if telemetryCollectionStrategy == nil {
return nil, model.InternalError(fmt.Errorf(
"service doesn't have telemetry collection strategy: %s", svcDetails.Id,
))
}
result := &cloudintegrations.ServiceConnectionStatus{}
result := &cloudintegrations.CloudServiceConnectionStatus{}
errors := []*model.ApiError{}
var resultLock sync.Mutex
@@ -4075,10 +4074,10 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *services.AWSMetricsStrategy,
metricsCollectedBySvc []services.CollectedMetric,
strategy *cloudintegrations.AWSMetricsCollectionStrategy,
metricsCollectedBySvc []cloudintegrations.CollectedMetric,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.StreamFilters) < 1 {
if strategy == nil || len(strategy.CloudwatchMetricsStreamFilters) < 1 {
return nil, nil
}
@@ -4087,7 +4086,7 @@ func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
"cloud_account_id": cloudAccountId,
}
metricsNamespace := strategy.StreamFilters[0].Namespace
metricsNamespace := strategy.CloudwatchMetricsStreamFilters[0].Namespace
metricsNamespaceParts := strings.Split(metricsNamespace, "/")
if len(metricsNamespaceParts) >= 2 {
@@ -4124,13 +4123,13 @@ func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
func (aH *APIHandler) calculateAWSIntegrationSvcLogsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *services.AWSLogsStrategy,
strategy *cloudintegrations.AWSLogsCollectionStrategy,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.Subscriptions) < 1 {
if strategy == nil || len(strategy.CloudwatchLogsSubscriptions) < 1 {
return nil, nil
}
logGroupNamePrefix := strategy.Subscriptions[0].LogGroupNamePrefix
logGroupNamePrefix := strategy.CloudwatchLogsSubscriptions[0].LogGroupNamePrefix
if len(logGroupNamePrefix) < 1 {
return nil, nil
}

View File

@@ -28,15 +28,15 @@ func generateOverviewSQL(start, end int64, item []v3.FilterItem) string {
for _, filter := range item {
switch filter.Key.Key {
case "service.name":
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "service_name", format.ClickHouseFormattedValue(filter.Value)))
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "service_name", format.ClickHouseFormattedValue(filter.Value, false)))
case "name":
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "span_name", format.ClickHouseFormattedValue(filter.Value)))
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "span_name", format.ClickHouseFormattedValue(filter.Value, false)))
case "destination":
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "destination", format.ClickHouseFormattedValue(filter.Value)))
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "destination", format.ClickHouseFormattedValue(filter.Value, false)))
case "queue":
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "messaging_system", format.ClickHouseFormattedValue(filter.Value)))
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "messaging_system", format.ClickHouseFormattedValue(filter.Value, false)))
case "kind_string":
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "kind_string", format.ClickHouseFormattedValue(filter.Value)))
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "kind_string", format.ClickHouseFormattedValue(filter.Value, false)))
}
}

View File

@@ -131,7 +131,7 @@ func GetPathIndexFilter(path string) string {
return ""
}
func GetJSONFilter(item v3.FilterItem) (string, error) {
func GetJSONFilter(item v3.FilterItem, isEscaped bool) (string, error) {
dataType := item.Key.DataType
isArray := false
@@ -166,13 +166,13 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
filter = fmt.Sprintf(logsOp, key, GetPath(strings.Split(item.Key.Key, ".")[1:]))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas:
fmtVal := utils.ClickHouseFormattedValue(value)
fmtVal := utils.ClickHouseFormattedValue(value, isEscaped)
filter = fmt.Sprintf(logsOp, key, fmtVal)
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
filter = fmt.Sprintf("%s %s '%%%s%%'", key, logsOp, val)
default:
fmtVal := utils.ClickHouseFormattedValue(value)
fmtVal := utils.ClickHouseFormattedValue(value, isEscaped)
filter = fmt.Sprintf("%s %s %s", key, logsOp, fmtVal)
}
} else {

View File

@@ -331,7 +331,7 @@ var testGetJSONFilterData = []struct {
func TestGetJSONFilter(t *testing.T) {
for _, tt := range testGetJSONFilterData {
Convey("testGetJSONFilter", t, func() {
filter, err := GetJSONFilter(tt.FilterItem)
filter, err := GetJSONFilter(tt.FilterItem, false)
if tt.Error {
So(err, ShouldNotBeNil)
} else {

View File

@@ -168,7 +168,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
if item.Key.IsJSON {
filter, err := GetJSONFilter(item)
filter, err := GetJSONFilter(item, false)
if err != nil {
return "", err
}
@@ -193,7 +193,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
conditions = append(conditions, GetExistsNexistsFilter(op, item))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
columnName := getClickhouseColumnName(item.Key)
fmtVal := utils.ClickHouseFormattedValue(value)
fmtVal := utils.ClickHouseFormattedValue(value, false)
conditions = append(conditions, fmt.Sprintf(logsOp, columnName, fmtVal))
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
columnName := getClickhouseColumnName(item.Key)
@@ -206,7 +206,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
}
default:
columnName := getClickhouseColumnName(item.Key)
fmtVal := utils.ClickHouseFormattedValue(value)
fmtVal := utils.ClickHouseFormattedValue(value, false)
// for use lower for like and ilike
if op == v3.FilterOperatorLike || op == v3.FilterOperatorNotLike {
@@ -444,7 +444,7 @@ func Having(items []v3.Having) string {
// aggregate something and filter on that aggregate
var having []string
for _, item := range items {
having = append(having, fmt.Sprintf("value %s %s", item.Operator, utils.ClickHouseFormattedValue(item.Value)))
having = append(having, fmt.Sprintf("value %s %s", item.Operator, utils.ClickHouseFormattedValue(item.Value, false)))
}
return strings.Join(having, " AND ")
}

View File

@@ -30,7 +30,7 @@ var jsonLogOperators = map[v3.FilterOperator]string{
v3.FilterOperatorNotHas: "NOT has(%s, %s)",
}
func GetJSONFilter(item v3.FilterItem) (string, error) {
func GetJSONFilter(item v3.FilterItem, isEscaped bool) (string, error) {
dataType := item.Key.DataType
isArray := false
@@ -65,13 +65,13 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
filter = fmt.Sprintf(logsOp, key, logsV3.GetPath(strings.Split(item.Key.Key, ".")[1:]))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas:
fmtVal := utils.ClickHouseFormattedValue(value)
fmtVal := utils.ClickHouseFormattedValue(value, isEscaped)
filter = fmt.Sprintf(logsOp, key, fmtVal)
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
filter = fmt.Sprintf("%s %s '%%%s%%'", key, logsOp, val)
default:
fmtVal := utils.ClickHouseFormattedValue(value)
fmtVal := utils.ClickHouseFormattedValue(value, isEscaped)
filter = fmt.Sprintf("%s %s %s", key, logsOp, fmtVal)
}
} else {

View File

@@ -253,7 +253,7 @@ var testGetJSONFilterData = []struct {
func TestGetJSONFilter(t *testing.T) {
for _, tt := range testGetJSONFilterData {
Convey("testGetJSONFilter", t, func() {
filter, err := GetJSONFilter(tt.FilterItem)
filter, err := GetJSONFilter(tt.FilterItem, false)
if tt.Error {
So(err, ShouldNotBeNil)
} else {

View File

@@ -113,7 +113,7 @@ func getExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string {
return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key)
}
func buildAttributeFilter(item v3.FilterItem) (string, error) {
func buildAttributeFilter(item v3.FilterItem, isEscaped bool) (string, error) {
// check if the user is searching for value in all attributes
key := item.Key.Key
op := v3.FilterOperator(strings.ToLower(string(item.Operator)))
@@ -133,12 +133,12 @@ func buildAttributeFilter(item v3.FilterItem) (string, error) {
if (op != v3.FilterOperatorEqual && op != v3.FilterOperatorContains) || item.Key.DataType != v3.AttributeKeyDataTypeString {
return "", fmt.Errorf("only = operator and string data type is supported for __attrs")
}
val := utils.ClickHouseFormattedValue(item.Value)
val := utils.ClickHouseFormattedValue(item.Value, isEscaped)
return fmt.Sprintf("has(mapValues(attributes_string), %s)", val), nil
}
keyName := getClickhouseKey(item.Key)
fmtVal := utils.ClickHouseFormattedValue(value)
fmtVal := utils.ClickHouseFormattedValue(value, isEscaped)
if logsOp, ok := logOperators[op]; ok {
switch op {
@@ -148,8 +148,16 @@ func buildAttributeFilter(item v3.FilterItem) (string, error) {
return fmt.Sprintf(logsOp, keyName, fmtVal), nil
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
// we also want to treat %, _ as literals for contains
val := utils.QuoteEscapedStringForContains(fmt.Sprintf("%s", item.Value), false)
var val string
if !isEscaped {
val = utils.QuoteEscapedString(fmt.Sprintf("%s", item.Value))
} else {
val = fmt.Sprintf("%s", item.Value)
}
// we want to treat %, _ as literals for contains
val = utils.EscapedStringForContains(val, false)
// for body the contains is case insensitive
if keyName == BODY {
logsOp = strings.Replace(logsOp, "ILIKE", "LIKE", 1) // removing i from ilike and not ilike
@@ -159,7 +167,12 @@ func buildAttributeFilter(item v3.FilterItem) (string, error) {
}
case v3.FilterOperatorLike, v3.FilterOperatorNotLike:
// for body use lower for like and ilike
val := utils.QuoteEscapedString(fmt.Sprintf("%s", item.Value))
var val string
if isEscaped {
val = utils.QuoteEscapedString(fmt.Sprintf("%s", item.Value))
} else {
val = fmt.Sprintf("%s", item.Value)
}
if keyName == BODY {
logsOp = strings.Replace(logsOp, "ILIKE", "LIKE", 1) // removing i from ilike and not ilike
return fmt.Sprintf("lower(%s) %s lower('%s')", keyName, logsOp, val), nil
@@ -174,7 +187,7 @@ func buildAttributeFilter(item v3.FilterItem) (string, error) {
}
}
func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey) (string, error) {
func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey, isEscaped bool) (string, error) {
var conditions []string
if fs == nil || len(fs.Items) == 0 {
@@ -189,7 +202,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
// if the filter is json filter
if item.Key.IsJSON {
filter, err := GetJSONFilter(item)
filter, err := GetJSONFilter(item, isEscaped)
if err != nil {
return "", err
}
@@ -198,7 +211,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
}
// generate the filter
filter, err := buildAttributeFilter(item)
filter, err := buildAttributeFilter(item, isEscaped)
if err != nil {
return "", err
}
@@ -342,7 +355,7 @@ func generateAggregateClause(aggOp v3.AggregateOperator,
}
}
func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string) (string, error) {
func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string, isEscaped bool) (string, error) {
// timerange will be sent in epoch millisecond
logsStart := utils.GetEpochNanoSecs(start)
logsEnd := utils.GetEpochNanoSecs(end)
@@ -355,7 +368,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d) AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", logsStart, logsEnd, bucketStart, bucketEnd)
// build the where clause for main table
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute)
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute, isEscaped)
if err != nil {
return "", err
}
@@ -364,7 +377,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
}
// build the where clause for resource table
resourceSubQuery, err := resource.BuildResourceSubQuery(DB_NAME, DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false)
resourceSubQuery, err := resource.BuildResourceSubQuery(DB_NAME, DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false, isEscaped)
if err != nil {
return "", err
}
@@ -446,14 +459,14 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
return query, nil
}
func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) {
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, v3.AttributeKey{})
func buildLogsLiveTailQuery(mq *v3.BuilderQuery, isEscaped bool) (string, error) {
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, v3.AttributeKey{}, isEscaped)
if err != nil {
return "", err
}
// no values for bucket start and end
resourceSubQuery, err := resource.BuildResourceSubQuery(DB_NAME, DISTRIBUTED_LOGS_V2_RESOURCE, 0, 0, mq.Filters, mq.GroupBy, mq.AggregateAttribute, true)
resourceSubQuery, err := resource.BuildResourceSubQuery(DB_NAME, DISTRIBUTED_LOGS_V2_RESOURCE, 0, 0, mq.Filters, mq.GroupBy, mq.AggregateAttribute, true, isEscaped)
if err != nil {
return "", err
}
@@ -491,14 +504,14 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
// }
if options.IsLivetailQuery {
query, err := buildLogsLiveTailQuery(mq)
query, err := buildLogsLiveTailQuery(mq, options.ValuesEscaped)
if err != nil {
return "", err
}
return query, nil
} else if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
// give me just the group_by names (no values)
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype)
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.ValuesEscaped)
if err != nil {
return "", err
}
@@ -506,14 +519,14 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
return query, nil
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype)
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.ValuesEscaped)
if err != nil {
return "", err
}
return query, nil
}
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype)
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.ValuesEscaped)
if err != nil {
return "", err
}

View File

@@ -169,7 +169,8 @@ func Test_getExistsNexistsFilter(t *testing.T) {
func Test_buildAttributeFilter(t *testing.T) {
type args struct {
item v3.FilterItem
item v3.FilterItem
isEscaped bool
}
tests := []struct {
name string
@@ -297,10 +298,42 @@ func Test_buildAttributeFilter(t *testing.T) {
},
want: "lower(body) LIKE lower('test')",
},
{
name: "build attribute filter contains- body escaped",
args: args{
item: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body",
DataType: v3.AttributeKeyDataTypeString,
IsColumn: true,
},
Operator: v3.FilterOperatorContains,
Value: `{\\"hello\\": \\"wo_rld\\"}`,
},
isEscaped: true,
},
want: `lower(body) LIKE lower('%{\\"hello\\": \\"wo\_rld\\"}%')`,
},
{
name: "build attribute filter eq- body escaped",
args: args{
item: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body",
DataType: v3.AttributeKeyDataTypeString,
IsColumn: true,
},
Operator: v3.FilterOperatorEqual,
Value: `{\\"hello\\": \\"wo_rld\\"}`,
},
isEscaped: true,
},
want: `body = '{\\"hello\\": \\"wo_rld\\"}'`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildAttributeFilter(tt.args.item)
got, err := buildAttributeFilter(tt.args.item, tt.args.isEscaped)
if (err != nil) != tt.wantErr {
t.Errorf("buildAttributeFilter() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -317,6 +350,7 @@ func Test_buildLogsTimeSeriesFilterQuery(t *testing.T) {
fs *v3.FilterSet
groupBy []v3.AttributeKey
aggregateAttribute v3.AttributeKey
isEscaped bool
}
tests := []struct {
name string
@@ -436,7 +470,7 @@ func Test_buildLogsTimeSeriesFilterQuery(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildLogsTimeSeriesFilterQuery(tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute)
got, err := buildLogsTimeSeriesFilterQuery(tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute, tt.args.isEscaped)
if (err != nil) != tt.wantErr {
t.Errorf("buildLogsTimeSeriesFilterQuery() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -641,6 +675,7 @@ func Test_buildLogsQuery(t *testing.T) {
step int64
mq *v3.BuilderQuery
graphLimitQtype string
isEscaped bool
}
tests := []struct {
name string
@@ -785,7 +820,7 @@ func Test_buildLogsQuery(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildLogsQuery(tt.args.panelType, tt.args.start, tt.args.end, tt.args.step, tt.args.mq, tt.args.graphLimitQtype)
got, err := buildLogsQuery(tt.args.panelType, tt.args.start, tt.args.end, tt.args.step, tt.args.mq, tt.args.graphLimitQtype, tt.args.isEscaped)
if (err != nil) != tt.wantErr {
t.Errorf("buildLogsQuery() error = %v, wantErr %v", err, tt.wantErr)
return

View File

@@ -296,7 +296,7 @@ func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string
func having(items []v3.Having) string {
var having []string
for _, item := range items {
having = append(having, fmt.Sprintf("%s %s %v", "value", item.Operator, utils.ClickHouseFormattedValue(item.Value)))
having = append(having, fmt.Sprintf("%s %s %v", "value", item.Operator, utils.ClickHouseFormattedValue(item.Value, false)))
}
return strings.Join(having, " AND ")
}

View File

@@ -282,7 +282,7 @@ func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string
}
var fmtVal string
if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
fmtVal = utils.ClickHouseFormattedValue(toFormat)
fmtVal = utils.ClickHouseFormattedValue(toFormat, false)
}
switch op {
case v3.FilterOperatorEqual:
@@ -364,7 +364,7 @@ func PrepareTimeseriesFilterQueryV3(start, end int64, mq *v3.BuilderQuery) (stri
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
toFormat = fmt.Sprintf("%%%s%%", toFormat)
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)
fmtVal := utils.ClickHouseFormattedValue(toFormat, false)
switch op {
case v3.FilterOperatorEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))

View File

@@ -889,7 +889,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL {
formattedVars[name] = metrics.PromFormattedValue(value)
} else if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeClickHouseSQL {
formattedVars[name] = utils.ClickHouseFormattedValue(value)
formattedVars[name] = utils.ClickHouseFormattedValue(value, queryRangeParams.ValuesEscaped)
}
}

View File

@@ -45,7 +45,7 @@ func prepareLogsQuery(_ context.Context,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit},
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
return query, err
@@ -56,7 +56,7 @@ func prepareLogsQuery(_ context.Context,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit},
v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
return query, err
@@ -71,7 +71,7 @@ func prepareLogsQuery(_ context.Context,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{},
v3.QBOptions{ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
return query, err
@@ -184,7 +184,7 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit},
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
@@ -195,7 +195,7 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit},
v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
@@ -208,7 +208,7 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{},
v3.QBOptions{ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}

View File

@@ -44,7 +44,7 @@ func prepareLogsQuery(_ context.Context,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit},
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
return query, err
@@ -55,7 +55,7 @@ func prepareLogsQuery(_ context.Context,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit},
v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
return query, err
@@ -70,7 +70,7 @@ func prepareLogsQuery(_ context.Context,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{},
v3.QBOptions{ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
return query, err
@@ -184,7 +184,7 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit},
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
@@ -195,7 +195,7 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit},
v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
@@ -208,7 +208,7 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
v3.QBOptions{},
v3.QBOptions{ValuesEscaped: params.ValuesEscaped},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}

View File

@@ -193,12 +193,12 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3) (map[strin
// for ts query with group by and limit form two queries
if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 {
limitQuery, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType, query,
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit})
v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, ValuesEscaped: params.ValuesEscaped})
if err != nil {
return nil, err
}
placeholderQuery, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType,
query, v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit})
query, v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, ValuesEscaped: params.ValuesEscaped})
if err != nil {
return nil, err
}
@@ -206,7 +206,7 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3) (map[strin
queries[queryName] = query
} else {
queryString, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType,
query, v3.QBOptions{GraphLimitQtype: ""})
query, v3.QBOptions{GraphLimitQtype: "", ValuesEscaped: params.ValuesEscaped})
if err != nil {
return nil, err
}
@@ -215,18 +215,18 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3) (map[strin
case v3.DataSourceLogs:
// for ts query with limit replace it as it is already formed
if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 {
limitQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit})
limitQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.QBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, ValuesEscaped: params.ValuesEscaped})
if err != nil {
return nil, err
}
placeholderQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit})
placeholderQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.QBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, ValuesEscaped: params.ValuesEscaped})
if err != nil {
return nil, err
}
query := fmt.Sprintf(placeholderQuery, limitQuery)
queries[queryName] = query
} else {
queryString, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.QBOptions{GraphLimitQtype: ""})
queryString, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.QBOptions{GraphLimitQtype: "", ValuesEscaped: params.ValuesEscaped})
if err != nil {
return nil, err
}

View File

@@ -28,14 +28,14 @@ var resourceLogOperators = map[v3.FilterOperator]string{
}
// buildResourceFilter builds a clickhouse filter string for resource labels
func buildResourceFilter(logsOp string, key string, op v3.FilterOperator, value interface{}) string {
func buildResourceFilter(logsOp string, key string, op v3.FilterOperator, value interface{}, isEscaped bool) string {
// for all operators except contains and like
searchKey := fmt.Sprintf("simpleJSONExtractString(labels, '%s')", key)
// for contains and like it will be case insensitive
lowerSearchKey := fmt.Sprintf("simpleJSONExtractString(lower(labels), '%s')", key)
chFmtVal := utils.ClickHouseFormattedValue(value)
chFmtVal := utils.ClickHouseFormattedValue(value, isEscaped)
lowerValue := strings.ToLower(fmt.Sprintf("%s", value))
@@ -47,14 +47,25 @@ func buildResourceFilter(logsOp string, key string, op v3.FilterOperator, value
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
return fmt.Sprintf(logsOp, searchKey, chFmtVal)
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
// this is required as clickhouseFormattedValue add's quotes to the string
var val string
if !isEscaped {
val = utils.QuoteEscapedString(lowerValue)
} else {
val = lowerValue
}
// we also want to treat %, _ as literals for contains
escapedStringValue := utils.QuoteEscapedStringForContains(lowerValue, false)
return fmt.Sprintf("%s %s '%%%s%%'", lowerSearchKey, logsOp, escapedStringValue)
val = utils.EscapedStringForContains(val, false)
return fmt.Sprintf("%s %s '%%%s%%'", lowerSearchKey, logsOp, val)
case v3.FilterOperatorLike, v3.FilterOperatorNotLike:
// this is required as clickhouseFormattedValue add's quotes to the string
escapedStringValue := utils.QuoteEscapedString(lowerValue)
return fmt.Sprintf("%s %s '%s'", lowerSearchKey, logsOp, escapedStringValue)
var val string
if !isEscaped {
val = utils.QuoteEscapedString(lowerValue)
} else {
val = lowerValue
}
return fmt.Sprintf("%s %s '%s'", lowerSearchKey, logsOp, val)
default:
return fmt.Sprintf("%s %s %s", searchKey, logsOp, chFmtVal)
}
@@ -63,7 +74,7 @@ func buildResourceFilter(logsOp string, key string, op v3.FilterOperator, value
// buildIndexFilterForInOperator builds a clickhouse filter string for in operator
// example:= x in a,b,c = (labels like '%"x"%"a"%' or labels like '%"x":"b"%' or labels like '%"x"="c"%')
// example:= x nin a,b,c = (labels nlike '%"x"%"a"%' AND labels nlike '%"x"="b"' AND labels nlike '%"x"="c"%')
func buildIndexFilterForInOperator(key string, op v3.FilterOperator, value interface{}) string {
func buildIndexFilterForInOperator(key string, op v3.FilterOperator, value interface{}, isEscaped bool) string {
conditions := []string{}
separator := " OR "
sqlOp := "like"
@@ -92,8 +103,18 @@ func buildIndexFilterForInOperator(key string, op v3.FilterOperator, value inter
// if there are no values to filter on, return an empty string
if len(values) > 0 {
for _, v := range values {
value := utils.QuoteEscapedStringForContains(v, true)
conditions = append(conditions, fmt.Sprintf("labels %s '%%\"%s\":\"%s\"%%'", sqlOp, key, value))
var val string
if !isEscaped {
val = utils.QuoteEscapedString(v)
} else {
val = v
}
// we also want to treat %, _ as literals for contains
val = utils.EscapedStringForContains(val, true)
conditions = append(conditions, fmt.Sprintf("labels %s '%%\"%s\":\"%s\"%%'", sqlOp, key, val))
}
return "(" + strings.Join(conditions, separator) + ")"
}
@@ -107,10 +128,18 @@ func buildIndexFilterForInOperator(key string, op v3.FilterOperator, value inter
// for like/contains we will use lower index
// we can use lower index for =, in etc but it's difficult to do it for !=, NIN etc
// if as x != "ABC" we cannot predict something like "not lower(labels) like '%%x%%abc%%'". It has it be "not lower(labels) like '%%x%%ABC%%'"
func buildResourceIndexFilter(key string, op v3.FilterOperator, value interface{}) string {
func buildResourceIndexFilter(key string, op v3.FilterOperator, value interface{}, isEscaped bool) string {
// not using clickhouseFormattedValue as we don't wan't the quotes
strVal := fmt.Sprintf("%s", value)
fmtValEscapedForContains := utils.QuoteEscapedStringForContains(strVal, true)
var fmtValEscapedForContains string
if !isEscaped {
fmtValEscapedForContains = utils.QuoteEscapedString(strVal)
} else {
fmtValEscapedForContains = strVal
}
fmtValEscapedForContains = utils.EscapedStringForContains(fmtValEscapedForContains, true)
fmtValEscapedForContainsLower := strings.ToLower(fmtValEscapedForContains)
fmtValEscapedLower := strings.ToLower(utils.QuoteEscapedString(strVal))
@@ -132,7 +161,7 @@ func buildResourceIndexFilter(key string, op v3.FilterOperator, value interface{
// don't try to do anything for regex.
return ""
case v3.FilterOperatorIn, v3.FilterOperatorNotIn:
return buildIndexFilterForInOperator(key, op, value)
return buildIndexFilterForInOperator(key, op, value, isEscaped)
default:
return fmt.Sprintf("labels like '%%%s%%'", key)
}
@@ -140,7 +169,7 @@ func buildResourceIndexFilter(key string, op v3.FilterOperator, value interface{
// buildResourceFiltersFromFilterItems builds a list of clickhouse filter strings for resource labels from a FilterSet.
// It skips any filter items that are not resource attributes and checks that the operator is supported and the data type is correct.
func buildResourceFiltersFromFilterItems(fs *v3.FilterSet) ([]string, error) {
func buildResourceFiltersFromFilterItems(fs *v3.FilterSet, isEscaped bool) ([]string, error) {
var conditions []string
if fs == nil || len(fs.Items) == 0 {
return nil, nil
@@ -175,11 +204,11 @@ func buildResourceFiltersFromFilterItems(fs *v3.FilterSet) ([]string, error) {
if logsOp, ok := resourceLogOperators[op]; ok {
// the filter
if resourceFilter := buildResourceFilter(logsOp, keyName, op, value); resourceFilter != "" {
if resourceFilter := buildResourceFilter(logsOp, keyName, op, value, isEscaped); resourceFilter != "" {
conditions = append(conditions, resourceFilter)
}
// the additional filter for better usage of the index
if resourceIndexFilter := buildResourceIndexFilter(keyName, op, value); resourceIndexFilter != "" {
if resourceIndexFilter := buildResourceIndexFilter(keyName, op, value, isEscaped); resourceIndexFilter != "" {
conditions = append(conditions, resourceIndexFilter)
}
} else {
@@ -211,12 +240,12 @@ func buildResourceFiltersFromAggregateAttribute(aggregateAttribute v3.AttributeK
return ""
}
func BuildResourceSubQuery(dbName, tableName string, bucketStart, bucketEnd int64, fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey, isLiveTail bool) (string, error) {
func BuildResourceSubQuery(dbName, tableName string, bucketStart, bucketEnd int64, fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey, isLiveTail bool, isEscaped bool) (string, error) {
// BUILD THE WHERE CLAUSE
var conditions []string
// only add the resource attributes to the filters here
rs, err := buildResourceFiltersFromFilterItems(fs)
rs, err := buildResourceFiltersFromFilterItems(fs, isEscaped)
if err != nil {
return "", err
}

View File

@@ -9,10 +9,11 @@ import (
func Test_buildResourceFilter(t *testing.T) {
type args struct {
logsOp string
key string
op v3.FilterOperator
value interface{}
logsOp string
key string
op v3.FilterOperator
value interface{}
isEscaped bool
}
tests := []struct {
name string
@@ -88,7 +89,7 @@ func Test_buildResourceFilter(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildResourceFilter(tt.args.logsOp, tt.args.key, tt.args.op, tt.args.value); got != tt.want {
if got := buildResourceFilter(tt.args.logsOp, tt.args.key, tt.args.op, tt.args.value, tt.args.isEscaped); got != tt.want {
t.Errorf("buildResourceFilter() = %v, want %v", got, tt.want)
}
})
@@ -97,9 +98,10 @@ func Test_buildResourceFilter(t *testing.T) {
func Test_buildIndexFilterForInOperator(t *testing.T) {
type args struct {
key string
op v3.FilterOperator
value interface{}
key string
op v3.FilterOperator
value interface{}
isEscaped bool
}
tests := []struct {
name string
@@ -142,10 +144,20 @@ func Test_buildIndexFilterForInOperator(t *testing.T) {
},
want: `(labels not like '%"service.name":"application\'\\\\"\_s"%')`,
},
{
name: "test nin string with escaped quotes",
args: args{
key: "service.name",
op: v3.FilterOperatorNotIn,
value: `application\'"_s`,
isEscaped: true,
},
want: `(labels not like '%"service.name":"application\'\\\\"\_s"%')`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildIndexFilterForInOperator(tt.args.key, tt.args.op, tt.args.value); got != tt.want {
if got := buildIndexFilterForInOperator(tt.args.key, tt.args.op, tt.args.value, tt.args.isEscaped); got != tt.want {
t.Errorf("buildIndexFilterForInOperator() = %v, want %v", got, tt.want)
}
})
@@ -154,9 +166,10 @@ func Test_buildIndexFilterForInOperator(t *testing.T) {
func Test_buildResourceIndexFilter(t *testing.T) {
type args struct {
key string
op v3.FilterOperator
value interface{}
key string
op v3.FilterOperator
value interface{}
isEscaped bool
}
tests := []struct {
name string
@@ -235,10 +248,20 @@ func Test_buildResourceIndexFilter(t *testing.T) {
},
want: `labels like '%service.name%Application\\\\"%'`,
},
{
name: "test eq with escaped quotes",
args: args{
key: "service.name",
op: v3.FilterOperatorEqual,
value: `App\\lication"`,
isEscaped: true,
},
want: `labels like '%service.name%App\\lication\\\\"%'`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildResourceIndexFilter(tt.args.key, tt.args.op, tt.args.value); got != tt.want {
if got := buildResourceIndexFilter(tt.args.key, tt.args.op, tt.args.value, tt.args.isEscaped); got != tt.want {
t.Errorf("buildResourceIndexFilter() = %v, want %v", got, tt.want)
}
})
@@ -247,7 +270,8 @@ func Test_buildResourceIndexFilter(t *testing.T) {
func Test_buildResourceFiltersFromFilterItems(t *testing.T) {
type args struct {
fs *v3.FilterSet
fs *v3.FilterSet
isEscaped bool
}
tests := []struct {
name string
@@ -335,7 +359,7 @@ func Test_buildResourceFiltersFromFilterItems(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildResourceFiltersFromFilterItems(tt.args.fs)
got, err := buildResourceFiltersFromFilterItems(tt.args.fs, tt.args.isEscaped)
if (err != nil) != tt.wantErr {
t.Errorf("buildResourceFiltersFromFilterItems() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -439,6 +463,7 @@ func Test_buildResourceSubQuery(t *testing.T) {
fs *v3.FilterSet
groupBy []v3.AttributeKey
aggregateAttribute v3.AttributeKey
isEscaped bool
}
tests := []struct {
name string
@@ -497,7 +522,7 @@ func Test_buildResourceSubQuery(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := BuildResourceSubQuery("signoz_logs", "distributed_logs_v2_resource", tt.args.bucketStart, tt.args.bucketEnd, tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute, false)
got, err := BuildResourceSubQuery("signoz_logs", "distributed_logs_v2_resource", tt.args.bucketStart, tt.args.bucketEnd, tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute, false, tt.args.isEscaped)
if (err != nil) != tt.wantErr {
t.Errorf("buildResourceSubQuery() error = %v, wantErr %v", err, tt.wantErr)
return

View File

@@ -162,7 +162,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
}
}
if val != nil {
fmtVal = utils.ClickHouseFormattedValue(val)
fmtVal = utils.ClickHouseFormattedValue(val, false)
}
if operator, ok := tracesOperatorMappingV3[item.Operator]; ok {
switch item.Operator {
@@ -459,7 +459,7 @@ func Having(items []v3.Having) string {
// aggregate something and filter on that aggregate
var having []string
for _, item := range items {
having = append(having, fmt.Sprintf("value %s %s", item.Operator, utils.ClickHouseFormattedValue(item.Value)))
having = append(having, fmt.Sprintf("value %s %s", item.Operator, utils.ClickHouseFormattedValue(item.Value, false)))
}
return strings.Join(having, " AND ")
}

View File

@@ -87,7 +87,7 @@ func existsSubQueryForFixedColumn(key v3.AttributeKey, op v3.FilterOperator) (st
}
}
func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
func buildTracesFilterQuery(fs *v3.FilterSet, isEscaped bool) (string, error) {
var conditions []string
if fs != nil && len(fs.Items) != 0 {
@@ -111,13 +111,21 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
}
}
if val != nil {
fmtVal = utils.ClickHouseFormattedValue(val)
fmtVal = utils.ClickHouseFormattedValue(val, isEscaped)
}
if operator, ok := tracesOperatorMappingV3[item.Operator]; ok {
switch item.Operator {
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
// we also want to treat %, _ as literals for contains
val := utils.QuoteEscapedStringForContains(fmt.Sprintf("%s", item.Value), false)
var val string
if !isEscaped {
val = utils.QuoteEscapedString(fmt.Sprintf("%s", item.Value))
} else {
val = fmt.Sprintf("%s", item.Value)
}
// we want to treat %, _ as literals for contains
val = utils.EscapedStringForContains(val, false)
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, operator, val))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal))
@@ -148,7 +156,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
return queryString, nil
}
func handleEmptyValuesInGroupBy(groupBy []v3.AttributeKey) (string, error) {
func handleEmptyValuesInGroupBy(groupBy []v3.AttributeKey, isEscaped bool) (string, error) {
// TODO(nitya): in future when we support user based mat column handle them
// skipping now as we don't support creating them
filterItems := []v3.FilterItem{}
@@ -167,7 +175,7 @@ func handleEmptyValuesInGroupBy(groupBy []v3.AttributeKey) (string, error) {
Operator: "AND",
Items: filterItems,
}
return buildTracesFilterQuery(&filterSet)
return buildTracesFilterQuery(&filterSet, isEscaped)
}
return "", nil
}
@@ -248,7 +256,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
timeFilter := fmt.Sprintf("(timestamp >= '%d' AND timestamp <= '%d') AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", tracesStart, tracesEnd, bucketStart, bucketEnd)
filterSubQuery, err := buildTracesFilterQuery(mq.Filters)
filterSubQuery, err := buildTracesFilterQuery(mq.Filters, options.ValuesEscaped)
if err != nil {
return "", err
}
@@ -256,7 +264,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
filterSubQuery = " AND " + filterSubQuery
}
emptyValuesInGroupByFilter, err := handleEmptyValuesInGroupBy(mq.GroupBy)
emptyValuesInGroupByFilter, err := handleEmptyValuesInGroupBy(mq.GroupBy, options.ValuesEscaped)
if err != nil {
return "", err
}
@@ -264,7 +272,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
filterSubQuery = filterSubQuery + " AND " + emptyValuesInGroupByFilter
}
resourceSubQuery, err := resource.BuildResourceSubQuery("signoz_traces", "distributed_traces_v3_resource", bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false)
resourceSubQuery, err := resource.BuildResourceSubQuery("signoz_traces", "distributed_traces_v3_resource", bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false, options.ValuesEscaped)
if err != nil {
return "", err
}

View File

@@ -193,7 +193,8 @@ func Test_getSelectLabels(t *testing.T) {
func Test_buildTracesFilterQuery(t *testing.T) {
type args struct {
fs *v3.FilterSet
fs *v3.FilterSet
isEscaped bool
}
tests := []struct {
name string
@@ -271,10 +272,32 @@ func Test_buildTracesFilterQuery(t *testing.T) {
},
want: "mapContains(attributes_string, 'host') AND mapContains(attributes_number, 'duration') AND NOT mapContains(attributes_bool, 'isDone') AND NOT mapContains(attributes_string, 'host1') AND `attribute_string_path` = '' AND http_url = '' AND `attribute_string_http$$route` = ''",
},
{
name: "Test with isEscaped contains",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: `hello\name_`, Operator: v3.FilterOperatorContains},
}},
isEscaped: true,
},
want: `name ILIKE '%hello\name\_%'`,
wantErr: false,
},
{
name: "Test with isEscaped eq",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: `hello\name_`, Operator: v3.FilterOperatorEqual},
}},
isEscaped: true,
},
want: `name = 'hello\name_'`,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildTracesFilterQuery(tt.args.fs)
got, err := buildTracesFilterQuery(tt.args.fs, tt.args.isEscaped)
if (err != nil) != tt.wantErr {
t.Errorf("buildTracesFilterQuery() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -315,7 +338,7 @@ func Test_handleEmptyValuesInGroupBy(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := handleEmptyValuesInGroupBy(tt.args.groupBy)
got, err := handleEmptyValuesInGroupBy(tt.args.groupBy, false)
if (err != nil) != tt.wantErr {
t.Errorf("handleEmptyValuesInGroupBy() error = %v, wantErr %v", err, tt.wantErr)
return

View File

@@ -420,6 +420,7 @@ type FilterAttributeValueResponse struct {
}
type QueryRangeParamsV3 struct {
ValuesEscaped bool `json:"valuesEscaped"`
Start int64 `json:"start"`
End int64 `json:"end"`
Step int64 `json:"step"` // step is in seconds; used for prometheus queries
@@ -1475,4 +1476,5 @@ type URLShareableOptions struct {
type QBOptions struct {
GraphLimitQtype string
IsLivetailQuery bool
ValuesEscaped bool
}

View File

@@ -28,7 +28,7 @@ func BuildFilterConditions(fs *v3.FilterSet, skipKey string) ([]string, error) {
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
toFormat = fmt.Sprintf("%%%s%%", toFormat)
}
fmtVal := ClickHouseFormattedValue(toFormat)
fmtVal := ClickHouseFormattedValue(toFormat, false)
// Determine if the key is a JSON key or a normal column
isJSONKey := false

View File

@@ -159,10 +159,7 @@ func QuoteEscapedString(str string) string {
return str
}
func QuoteEscapedStringForContains(str string, isIndex bool) string {
// https: //clickhouse.com/docs/en/sql-reference/functions/string-search-functions#like
str = QuoteEscapedString(str)
func EscapedStringForContains(str string, isIndex bool) string {
// we are adding this because if a string contains quote `"` it will be stored as \" in clickhouse
// to query that using like our query should be \\\\"
if isIndex {
@@ -177,7 +174,7 @@ func QuoteEscapedStringForContains(str string, isIndex bool) string {
}
// ClickHouseFormattedValue formats the value to be used in clickhouse query
func ClickHouseFormattedValue(v interface{}) string {
func ClickHouseFormattedValue(v interface{}, isEscaped bool) string {
// if it's pointer convert it to a value
v = getPointerValue(v)
@@ -187,7 +184,11 @@ func ClickHouseFormattedValue(v interface{}) string {
case float32, float64:
return fmt.Sprintf("%f", x)
case string:
return fmt.Sprintf("'%s'", QuoteEscapedString(x))
if !isEscaped {
return fmt.Sprintf("'%s'", QuoteEscapedString(x))
} else {
return fmt.Sprintf("'%s'", x)
}
case bool:
return fmt.Sprintf("%v", x)
@@ -199,7 +200,11 @@ func ClickHouseFormattedValue(v interface{}) string {
case string:
str := "["
for idx, sVal := range x {
str += fmt.Sprintf("'%s'", QuoteEscapedString(sVal.(string)))
if !isEscaped {
str += fmt.Sprintf("'%s'", QuoteEscapedString(sVal.(string)))
} else {
str += fmt.Sprintf("'%s'", sVal.(string))
}
if idx != len(x)-1 {
str += ","
}
@@ -218,7 +223,11 @@ func ClickHouseFormattedValue(v interface{}) string {
}
str := "["
for idx, sVal := range x {
str += fmt.Sprintf("'%s'", QuoteEscapedString(sVal))
if !isEscaped {
str += fmt.Sprintf("'%s'", QuoteEscapedString(sVal))
} else {
str += fmt.Sprintf("'%s'", sVal)
}
if idx != len(x)-1 {
str += ","
}
@@ -234,13 +243,13 @@ func ClickHouseFormattedValue(v interface{}) string {
func ClickHouseFormattedMetricNames(v interface{}) string {
if name, ok := v.(string); ok {
if newName, ok := metrics.MetricsUnderTransition[name]; ok {
return ClickHouseFormattedValue([]interface{}{name, newName})
return ClickHouseFormattedValue([]interface{}{name, newName}, false)
} else {
return ClickHouseFormattedValue([]interface{}{name})
return ClickHouseFormattedValue([]interface{}{name}, false)
}
}
return ClickHouseFormattedValue(v)
return ClickHouseFormattedValue(v, false)
}
func AddBackTickToFormatTag(str string) string {

View File

@@ -317,9 +317,10 @@ var oneString = "1"
var trueBool = true
var testClickHouseFormattedValueData = []struct {
name string
value interface{}
want interface{}
name string
value interface{}
want interface{}
isEscaped bool
}{
{
name: "int",
@@ -394,12 +395,21 @@ var testClickHouseFormattedValueData = []struct {
},
want: "['test\\'1','test\\'2']",
},
{
name: "[]interface{} with string with single quote escaped",
value: []interface{}{
`test\\'1`,
`test\\'2`,
},
isEscaped: true,
want: `['test\\'1','test\\'2']`,
},
}
func TestClickHouseFormattedValue(t *testing.T) {
for _, tt := range testClickHouseFormattedValueData {
t.Run(tt.name, func(t *testing.T) {
got := ClickHouseFormattedValue(tt.value)
got := ClickHouseFormattedValue(tt.value, tt.isEscaped)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ClickHouseFormattedValue() = %v, want %v", got, tt.want)
}

View File

@@ -1,149 +0,0 @@
package telemetrymetadata
import (
"context"
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
var (
attributeMetadataColumns = map[string]*schema.Column{
"resource_attributes": {Name: "resource_attributes", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}},
"attributes": {Name: "attributes", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}},
}
)
type conditionBuilder struct {
}
func NewConditionBuilder() qbtypes.ConditionBuilder {
return &conditionBuilder{}
}
func (c *conditionBuilder) GetColumn(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
return attributeMetadataColumns["resource_attributes"], nil
case telemetrytypes.FieldContextAttribute:
return attributeMetadataColumns["attributes"], nil
}
return nil, qbtypes.ErrColumnNotFound
}
func (c *conditionBuilder) GetTableFieldName(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
column, err := c.GetColumn(ctx, key)
if err != nil {
return "", err
}
switch column.Type {
case schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}:
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
}
return column.Name, nil
}
func (c *conditionBuilder) GetCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
column, err := c.GetColumn(ctx, key)
if err != nil {
// if we don't have a column, we can't build a condition for related values
return "", nil
}
tblFieldName, err := c.GetTableFieldName(ctx, key)
if err != nil {
// if we don't have a table field name, we can't build a condition for related values
return "", nil
}
if key.FieldDataType != telemetrytypes.FieldDataTypeString {
// if the field data type is not string, we can't build a condition for related values
return "", nil
}
// key must exists to apply main filter
containsExp := fmt.Sprintf("mapContains(%s, %s)", column.Name, sb.Var(key.Name))
// regular operators
switch operator {
// regular operators
case qbtypes.FilterOperatorEqual:
return sb.And(containsExp, sb.E(tblFieldName, value)), nil
case qbtypes.FilterOperatorNotEqual:
return sb.And(containsExp, sb.NE(tblFieldName, value)), nil
// like and not like
case qbtypes.FilterOperatorLike:
return sb.And(containsExp, sb.Like(tblFieldName, value)), nil
case qbtypes.FilterOperatorNotLike:
return sb.And(containsExp, sb.NotLike(tblFieldName, value)), nil
case qbtypes.FilterOperatorILike:
return sb.And(containsExp, sb.ILike(tblFieldName, value)), nil
case qbtypes.FilterOperatorNotILike:
return sb.And(containsExp, sb.NotILike(tblFieldName, value)), nil
case qbtypes.FilterOperatorContains:
return sb.And(containsExp, sb.ILike(tblFieldName, fmt.Sprintf("%%%s%%", value))), nil
case qbtypes.FilterOperatorNotContains:
return sb.And(containsExp, sb.NotILike(tblFieldName, fmt.Sprintf("%%%s%%", value))), nil
case qbtypes.FilterOperatorRegexp:
exp := fmt.Sprintf(`match(%s, %s)`, tblFieldName, sb.Var(value))
return sb.And(containsExp, exp), nil
case qbtypes.FilterOperatorNotRegexp:
exp := fmt.Sprintf(`not match(%s, %s)`, tblFieldName, sb.Var(value))
return sb.And(containsExp, exp), nil
// in and not in
case qbtypes.FilterOperatorIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
return sb.And(containsExp, sb.In(tblFieldName, values...)), nil
case qbtypes.FilterOperatorNotIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
return sb.And(containsExp, sb.NotIn(tblFieldName, values...)), nil
// exists and not exists
// in the query builder, `exists` and `not exists` are used for
// key membership checks, so depending on the column type, the condition changes
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
switch column.Type {
case schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}:
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
if operator == qbtypes.FilterOperatorExists {
return sb.E(leftOperand, true), nil
} else {
return sb.NE(leftOperand, true), nil
}
}
}
return "", nil
}

View File

@@ -1,272 +0,0 @@
package telemetrymetadata
import (
"context"
"testing"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetColumn(t *testing.T) {
ctx := context.Background()
conditionBuilder := NewConditionBuilder()
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
expectedCol *schema.Column
expectedError error
}{
{
name: "Resource field",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
},
expectedCol: attributeMetadataColumns["resource_attributes"],
expectedError: nil,
},
{
name: "Scope field - scope name",
key: telemetrytypes.TelemetryFieldKey{
Name: "name",
FieldContext: telemetrytypes.FieldContextScope,
},
expectedCol: nil,
expectedError: qbtypes.ErrColumnNotFound,
},
{
name: "Scope field - scope.name",
key: telemetrytypes.TelemetryFieldKey{
Name: "scope.name",
FieldContext: telemetrytypes.FieldContextScope,
},
expectedCol: nil,
expectedError: qbtypes.ErrColumnNotFound,
},
{
name: "Scope field - scope_name",
key: telemetrytypes.TelemetryFieldKey{
Name: "scope_name",
FieldContext: telemetrytypes.FieldContextScope,
},
expectedCol: nil,
expectedError: qbtypes.ErrColumnNotFound,
},
{
name: "Scope field - version",
key: telemetrytypes.TelemetryFieldKey{
Name: "version",
FieldContext: telemetrytypes.FieldContextScope,
},
expectedCol: nil,
expectedError: qbtypes.ErrColumnNotFound,
},
{
name: "Scope field - other scope field",
key: telemetrytypes.TelemetryFieldKey{
Name: "custom.scope.field",
FieldContext: telemetrytypes.FieldContextScope,
},
expectedCol: nil,
expectedError: qbtypes.ErrColumnNotFound,
},
{
name: "Attribute field - string type",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
expectedCol: attributeMetadataColumns["attributes"],
expectedError: nil,
},
{
name: "Attribute field - number type",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.size",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
expectedCol: attributeMetadataColumns["attributes"],
expectedError: nil,
},
{
name: "Attribute field - int64 type",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.duration",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeInt64,
},
expectedCol: attributeMetadataColumns["attributes"],
expectedError: nil,
},
{
name: "Attribute field - float64 type",
key: telemetrytypes.TelemetryFieldKey{
Name: "cpu.utilization",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeFloat64,
},
expectedCol: attributeMetadataColumns["attributes"],
expectedError: nil,
},
{
name: "Log field - nonexistent",
key: telemetrytypes.TelemetryFieldKey{
Name: "nonexistent_field",
FieldContext: telemetrytypes.FieldContextLog,
},
expectedCol: nil,
expectedError: qbtypes.ErrColumnNotFound,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
col, err := conditionBuilder.GetColumn(ctx, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
} else {
require.NoError(t, err)
assert.Equal(t, tc.expectedCol, col)
}
})
}
}
func TestGetFieldKeyName(t *testing.T) {
ctx := context.Background()
conditionBuilder := &conditionBuilder{}
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
expectedResult string
expectedError error
}{
{
name: "Map column type - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
expectedResult: "attributes['user.id']",
expectedError: nil,
},
{
name: "Map column type - number attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.size",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
expectedResult: "attributes['request.size']",
expectedError: nil,
},
{
name: "Map column type - bool attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.success",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
expectedResult: "attributes['request.success']",
expectedError: nil,
},
{
name: "Map column type - resource attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
},
expectedResult: "resource_attributes['service.name']",
expectedError: nil,
},
{
name: "Non-existent column",
key: telemetrytypes.TelemetryFieldKey{
Name: "nonexistent_field",
FieldContext: telemetrytypes.FieldContextLog,
},
expectedResult: "",
expectedError: qbtypes.ErrColumnNotFound,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := conditionBuilder.GetTableFieldName(ctx, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
} else {
require.NoError(t, err)
assert.Equal(t, tc.expectedResult, result)
}
})
}
}
func TestGetCondition(t *testing.T) {
ctx := context.Background()
conditionBuilder := NewConditionBuilder()
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
operator qbtypes.FilterOperator
value any
expectedSQL string
expectedError error
}{
{
name: "ILike operator - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorILike,
value: "%admin%",
expectedSQL: "WHERE (mapContains(attributes, ?) AND LOWER(attributes['user.id']) LIKE LOWER(?))",
expectedError: nil,
},
{
name: "Not ILike operator - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorNotILike,
value: "%admin%",
expectedSQL: "WHERE (mapContains(attributes, ?) AND LOWER(attributes['user.id']) NOT LIKE LOWER(?))",
expectedError: nil,
},
}
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
t.Run(tc.name, func(t *testing.T) {
cond, err := conditionBuilder.GetCondition(ctx, &tc.key, tc.operator, tc.value, sb)
sb.Where(cond)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
} else {
require.NoError(t, err)
sql, _ := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
assert.Contains(t, sql, tc.expectedSQL)
}
})
}
}

View File

@@ -1,691 +0,0 @@
package telemetrymetadata
import (
"context"
"fmt"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
"go.uber.org/zap"
)
var (
ErrFailedToGetTracesKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get traces keys")
ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys")
ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement")
ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys")
ErrFailedToGetRelatedValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get related values")
)
type telemetryMetaStore struct {
telemetrystore telemetrystore.TelemetryStore
tracesDBName string
tracesFieldsTblName string
indexV3TblName string
metricsDBName string
metricsFieldsTblName string
timeseries1WTblName string
logsDBName string
logsFieldsTblName string
logsV2TblName string
relatedMetadataDBName string
relatedMetadataTblName string
conditionBuilder qbtypes.ConditionBuilder
}
func NewTelemetryMetaStore(
telemetrystore telemetrystore.TelemetryStore,
tracesDBName string,
tracesFieldsTblName string,
indexV3TblName string,
metricsDBName string,
metricsFieldsTblName string,
timeseries1WTblName string,
logsDBName string,
logsV2TblName string,
logsFieldsTblName string,
relatedMetadataDBName string,
relatedMetadataTblName string,
) (telemetrytypes.MetadataStore, error) {
return &telemetryMetaStore{
telemetrystore: telemetrystore,
tracesDBName: tracesDBName,
tracesFieldsTblName: tracesFieldsTblName,
indexV3TblName: indexV3TblName,
metricsDBName: metricsDBName,
metricsFieldsTblName: metricsFieldsTblName,
timeseries1WTblName: timeseries1WTblName,
logsDBName: logsDBName,
logsV2TblName: logsV2TblName,
logsFieldsTblName: logsFieldsTblName,
relatedMetadataDBName: relatedMetadataDBName,
relatedMetadataTblName: relatedMetadataTblName,
conditionBuilder: NewConditionBuilder(),
}, nil
}
// tracesTblStatementToFieldKeys returns materialised attribute/resource/scope keys from the traces table
func (t *telemetryMetaStore) tracesTblStatementToFieldKeys(ctx context.Context) ([]*telemetrytypes.TelemetryFieldKey, error) {
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", t.tracesDBName, t.indexV3TblName)
statements := []telemetrytypes.ShowCreateTableStatement{}
err := t.telemetrystore.ClickhouseDB().Select(ctx, &statements, query)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTblStatement.Error())
}
return ExtractFieldKeysFromTblStatement(statements[0].Statement)
}
// getTracesKeys returns the keys from the spans that match the field selection criteria
func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) {
if len(fieldKeySelectors) == 0 {
return nil, nil
}
// pre-fetch the materialised keys from the traces table
matKeys, err := t.tracesTblStatementToFieldKeys(ctx)
if err != nil {
return nil, err
}
mapOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
for _, key := range matKeys {
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
}
sb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", `
CASE
WHEN tag_type = 'spanfield' THEN 1
WHEN tag_type = 'resource' THEN 2
WHEN tag_type = 'scope' THEN 3
WHEN tag_type = 'tag' THEN 4
ELSE 5
END as priority`).From(t.tracesDBName + "." + t.tracesFieldsTblName)
var limit int
conds := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
if fieldKeySelector.StartUnixMilli != 0 {
conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
}
if fieldKeySelector.EndUnixMilli != 0 {
conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
}
// key part of the selector
fieldKeyConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_key", fieldKeySelector.Name))
} else {
fieldKeyConds = append(fieldKeyConds, sb.Like("tag_key", "%"+fieldKeySelector.Name+"%"))
}
// now look at the field context
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_type", fieldKeySelector.FieldContext.TagType()))
}
// now look at the field data type
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_data_type", fieldKeySelector.FieldDataType.TagDataType()))
}
conds = append(conds, sb.And(fieldKeyConds...))
limit += fieldKeySelector.Limit
}
sb.Where(sb.Or(conds...))
if limit == 0 {
limit = 1000
}
mainSb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", "max(priority) as priority")
mainSb.From(mainSb.BuilderAs(sb, "sub_query"))
mainSb.GroupBy("tag_key", "tag_type", "tag_data_type")
mainSb.OrderBy("priority")
mainSb.Limit(limit)
query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTracesKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
for rows.Next() {
var name string
var fieldContext telemetrytypes.FieldContext
var fieldDataType telemetrytypes.FieldDataType
var priority uint8
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTracesKeys.Error())
}
key, ok := mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()]
// if there is no materialised column, create a key with the field context and data type
if !ok {
key = &telemetrytypes.TelemetryFieldKey{
Name: name,
FieldContext: fieldContext,
FieldDataType: fieldDataType,
}
}
keys = append(keys, key)
}
if rows.Err() != nil {
return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTracesKeys.Error())
}
return keys, nil
}
// logsTblStatementToFieldKeys returns materialised attribute/resource/scope keys from the logs table
func (t *telemetryMetaStore) logsTblStatementToFieldKeys(ctx context.Context) ([]*telemetrytypes.TelemetryFieldKey, error) {
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", t.logsDBName, t.logsV2TblName)
statements := []telemetrytypes.ShowCreateTableStatement{}
err := t.telemetrystore.ClickhouseDB().Select(ctx, &statements, query)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTblStatement.Error())
}
return ExtractFieldKeysFromTblStatement(statements[0].Statement)
}
// getLogsKeys returns the keys from the spans that match the field selection criteria
func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) {
if len(fieldKeySelectors) == 0 {
return nil, nil
}
// pre-fetch the materialised keys from the logs table
matKeys, err := t.logsTblStatementToFieldKeys(ctx)
if err != nil {
return nil, err
}
mapOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
for _, key := range matKeys {
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
}
sb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", `
CASE
WHEN tag_type = 'logfield' THEN 1
WHEN tag_type = 'resource' THEN 2
WHEN tag_type = 'scope' THEN 3
WHEN tag_type = 'tag' THEN 4
ELSE 5
END as priority`).From(t.logsDBName + "." + t.logsFieldsTblName)
var limit int
conds := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
if fieldKeySelector.StartUnixMilli != 0 {
conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
}
if fieldKeySelector.EndUnixMilli != 0 {
conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
}
// key part of the selector
fieldKeyConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_key", fieldKeySelector.Name))
} else {
fieldKeyConds = append(fieldKeyConds, sb.Like("tag_key", "%"+fieldKeySelector.Name+"%"))
}
// now look at the field context
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_type", fieldKeySelector.FieldContext.TagType()))
}
// now look at the field data type
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("tag_data_type", fieldKeySelector.FieldDataType.TagDataType()))
}
conds = append(conds, sb.And(fieldKeyConds...))
limit += fieldKeySelector.Limit
}
sb.Where(sb.Or(conds...))
if limit == 0 {
limit = 1000
}
mainSb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", "max(priority) as priority")
mainSb.From(mainSb.BuilderAs(sb, "sub_query"))
mainSb.GroupBy("tag_key", "tag_type", "tag_data_type")
mainSb.OrderBy("priority")
mainSb.Limit(limit)
query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
for rows.Next() {
var name string
var fieldContext telemetrytypes.FieldContext
var fieldDataType telemetrytypes.FieldDataType
var priority uint8
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
key, ok := mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()]
// if there is no materialised column, create a key with the field context and data type
if !ok {
key = &telemetrytypes.TelemetryFieldKey{
Name: name,
FieldContext: fieldContext,
FieldDataType: fieldDataType,
}
}
keys = append(keys, key)
}
if rows.Err() != nil {
return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
return keys, nil
}
// getMetricsKeys returns the keys from the metrics that match the field selection criteria
// TODO(srikanthccv): update the implementation after the dot metrics migration is done
func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) {
if len(fieldKeySelectors) == 0 {
return nil, nil
}
var whereClause, innerWhereClause string
var limit int
args := []any{}
for _, fieldKeySelector := range fieldKeySelectors {
if fieldKeySelector.MetricContext != nil {
innerWhereClause += "metric_name IN ? AND"
args = append(args, fieldKeySelector.MetricContext.MetricName)
}
}
innerWhereClause += " __normalized = true"
for idx, fieldKeySelector := range fieldKeySelectors {
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
whereClause += "(distinctTagKey = ? AND distinctTagKey NOT LIKE '\\_\\_%%')"
args = append(args, fieldKeySelector.Name)
} else {
whereClause += "(distinctTagKey ILIKE ? AND distinctTagKey NOT LIKE '\\_\\_%%')"
args = append(args, fmt.Sprintf("%%%s%%", fieldKeySelector.Name))
}
if idx != len(fieldKeySelectors)-1 {
whereClause += " OR "
}
limit += fieldKeySelector.Limit
}
args = append(args, limit)
query := fmt.Sprintf(`
SELECT
arrayJoin(tagKeys) AS distinctTagKey
FROM (
SELECT JSONExtractKeys(labels) AS tagKeys
FROM %s.%s
WHERE `+innerWhereClause+`
GROUP BY tagKeys
)
WHERE `+whereClause+`
GROUP BY distinctTagKey
LIMIT ?
`, t.metricsDBName, t.timeseries1WTblName)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
key := &telemetrytypes.TelemetryFieldKey{
Name: name,
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
}
keys = append(keys, key)
}
if rows.Err() != nil {
return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
return keys, nil
}
func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, error) {
var keys []*telemetrytypes.TelemetryFieldKey
var err error
switch fieldKeySelector.Signal {
case telemetrytypes.SignalTraces:
keys, err = t.getTracesKeys(ctx, []*telemetrytypes.FieldKeySelector{fieldKeySelector})
case telemetrytypes.SignalLogs:
keys, err = t.getLogsKeys(ctx, []*telemetrytypes.FieldKeySelector{fieldKeySelector})
case telemetrytypes.SignalMetrics:
keys, err = t.getMetricsKeys(ctx, []*telemetrytypes.FieldKeySelector{fieldKeySelector})
case telemetrytypes.SignalUnspecified:
// get traces keys
tracesKeys, err := t.getTracesKeys(ctx, []*telemetrytypes.FieldKeySelector{fieldKeySelector})
if err != nil {
return nil, err
}
keys = append(keys, tracesKeys...)
// get logs keys
logsKeys, err := t.getLogsKeys(ctx, []*telemetrytypes.FieldKeySelector{fieldKeySelector})
if err != nil {
return nil, err
}
keys = append(keys, logsKeys...)
// get metrics keys
metricsKeys, err := t.getMetricsKeys(ctx, []*telemetrytypes.FieldKeySelector{fieldKeySelector})
if err != nil {
return nil, err
}
keys = append(keys, metricsKeys...)
}
if err != nil {
return nil, err
}
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
for _, key := range keys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
return mapOfKeys, nil
}
func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, error) {
logsSelectors := []*telemetrytypes.FieldKeySelector{}
tracesSelectors := []*telemetrytypes.FieldKeySelector{}
metricsSelectors := []*telemetrytypes.FieldKeySelector{}
for _, fieldKeySelector := range fieldKeySelectors {
switch fieldKeySelector.Signal {
case telemetrytypes.SignalLogs:
logsSelectors = append(logsSelectors, fieldKeySelector)
case telemetrytypes.SignalTraces:
tracesSelectors = append(tracesSelectors, fieldKeySelector)
case telemetrytypes.SignalMetrics:
metricsSelectors = append(metricsSelectors, fieldKeySelector)
case telemetrytypes.SignalUnspecified:
logsSelectors = append(logsSelectors, fieldKeySelector)
tracesSelectors = append(tracesSelectors, fieldKeySelector)
metricsSelectors = append(metricsSelectors, fieldKeySelector)
}
}
logsKeys, err := t.getLogsKeys(ctx, logsSelectors)
if err != nil {
return nil, err
}
tracesKeys, err := t.getTracesKeys(ctx, tracesSelectors)
if err != nil {
return nil, err
}
metricsKeys, err := t.getMetricsKeys(ctx, metricsSelectors)
if err != nil {
return nil, err
}
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
for _, key := range logsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range tracesKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range metricsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
return mapOfKeys, nil
}
func (t *telemetryMetaStore) GetKey(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) {
keys, err := t.GetKeys(ctx, fieldKeySelector)
if err != nil {
return nil, err
}
return keys[fieldKeySelector.Name], nil
}
func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) ([]string, error) {
args := []any{}
var andConditions []string
andConditions = append(andConditions, `unix_milli >= ?`)
args = append(args, fieldValueSelector.StartUnixMilli)
andConditions = append(andConditions, `unix_milli <= ?`)
args = append(args, fieldValueSelector.EndUnixMilli)
if len(fieldValueSelector.ExistingQuery) != 0 {
// TODO(srikanthccv): add the existing query to the where clause
}
whereClause := strings.Join(andConditions, " AND ")
key := telemetrytypes.TelemetryFieldKey{
Name: fieldValueSelector.Name,
Signal: fieldValueSelector.Signal,
FieldContext: fieldValueSelector.FieldContext,
FieldDataType: fieldValueSelector.FieldDataType,
}
// TODO(srikanthccv): add the select column
selectColumn, _ := t.conditionBuilder.GetTableFieldName(ctx, &key)
args = append(args, fieldValueSelector.Limit)
filterSubQuery := fmt.Sprintf(
"SELECT DISTINCT %s FROM %s.%s WHERE %s LIMIT ?",
selectColumn,
t.relatedMetadataDBName,
t.relatedMetadataTblName,
whereClause,
)
zap.L().Debug("filterSubQuery for related values", zap.String("query", filterSubQuery), zap.Any("args", args))
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, filterSubQuery, args...)
if err != nil {
return nil, ErrFailedToGetRelatedValues
}
defer rows.Close()
var attributeValues []string
for rows.Next() {
var value string
if err := rows.Scan(&value); err != nil {
return nil, ErrFailedToGetRelatedValues
}
if value != "" {
attributeValues = append(attributeValues, value)
}
}
return attributeValues, nil
}
func (t *telemetryMetaStore) GetRelatedValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) ([]string, error) {
return t.getRelatedValues(ctx, fieldValueSelector)
}
func (t *telemetryMetaStore) getSpanFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, error) {
// build the query to get the keys from the spans that match the field selection criteria
var limit int
sb := sqlbuilder.Select("DISTINCT string_value, number_value").From(t.tracesDBName + "." + t.tracesFieldsTblName)
if fieldValueSelector.Name != "" {
sb.Where(sb.E("tag_key", fieldValueSelector.Name))
}
// now look at the field context
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
sb.Where(sb.E("tag_type", fieldValueSelector.FieldContext.TagType()))
}
// now look at the field data type
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
sb.Where(sb.E("tag_data_type", fieldValueSelector.FieldDataType.TagDataType()))
}
if fieldValueSelector.Value != "" {
if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeString {
sb.Where(sb.Like("string_value", "%"+fieldValueSelector.Value+"%"))
} else if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeNumber {
sb.Where(sb.IsNotNull("number_value"))
sb.Where(sb.Like("toString(number_value)", "%"+fieldValueSelector.Value+"%"))
}
}
if limit == 0 {
limit = 50
}
sb.Limit(limit)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
defer rows.Close()
values := &telemetrytypes.TelemetryFieldValues{}
seen := make(map[string]bool)
for rows.Next() {
var stringValue string
var numberValue float64
if err := rows.Scan(&stringValue, &numberValue); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
if _, ok := seen[stringValue]; !ok {
values.StringValues = append(values.StringValues, stringValue)
seen[stringValue] = true
}
if _, ok := seen[fmt.Sprintf("%f", numberValue)]; !ok && numberValue != 0 {
values.NumberValues = append(values.NumberValues, numberValue)
seen[fmt.Sprintf("%f", numberValue)] = true
}
}
return values, nil
}
func (t *telemetryMetaStore) getLogFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, error) {
// build the query to get the keys from the spans that match the field selection criteria
var limit int
sb := sqlbuilder.Select("DISTINCT string_value, number_value").From(t.logsDBName + "." + t.logsFieldsTblName)
if fieldValueSelector.Name != "" {
sb.Where(sb.E("tag_key", fieldValueSelector.Name))
}
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
sb.Where(sb.E("tag_type", fieldValueSelector.FieldContext.TagType()))
}
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
sb.Where(sb.E("tag_data_type", fieldValueSelector.FieldDataType.TagDataType()))
}
if fieldValueSelector.Value != "" {
if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeString {
sb.Where(sb.Like("string_value", "%"+fieldValueSelector.Value+"%"))
} else if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeNumber {
sb.Where(sb.IsNotNull("number_value"))
sb.Where(sb.Like("toString(number_value)", "%"+fieldValueSelector.Value+"%"))
}
}
if limit == 0 {
limit = 50
}
sb.Limit(limit)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
defer rows.Close()
values := &telemetrytypes.TelemetryFieldValues{}
seen := make(map[string]bool)
for rows.Next() {
var stringValue string
var numberValue float64
if err := rows.Scan(&stringValue, &numberValue); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
if _, ok := seen[stringValue]; !ok {
values.StringValues = append(values.StringValues, stringValue)
seen[stringValue] = true
}
if _, ok := seen[fmt.Sprintf("%f", numberValue)]; !ok && numberValue != 0 {
values.NumberValues = append(values.NumberValues, numberValue)
seen[fmt.Sprintf("%f", numberValue)] = true
}
}
return values, nil
}
func (t *telemetryMetaStore) getMetricFieldValues(_ context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, error) {
// TODO(srikanthccv): implement this. use new tables?
return nil, nil
}
func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, error) {
var values *telemetrytypes.TelemetryFieldValues
var err error
switch fieldValueSelector.Signal {
case telemetrytypes.SignalTraces:
values, err = t.getSpanFieldValues(ctx, fieldValueSelector)
case telemetrytypes.SignalLogs:
values, err = t.getLogFieldValues(ctx, fieldValueSelector)
case telemetrytypes.SignalMetrics:
values, err = t.getMetricFieldValues(ctx, fieldValueSelector)
}
if err != nil {
return nil, err
}
return values, nil
}

View File

@@ -1,86 +0,0 @@
package telemetrymetadata
import (
"context"
"fmt"
"regexp"
"testing"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
)
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func TestGetKeys(t *testing.T) {
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &regexMatcher{})
mock := mockTelemetryStore.Mock()
metadata, err := NewTelemetryMetaStore(
mockTelemetryStore,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,
telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName,
telemetrymetrics.TimeseriesV41weekTableName,
telemetrymetrics.TimeseriesV41weekTableName,
telemetrylogs.DBName,
telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName,
DBName,
AttributesMetadataLocalTableName,
)
if err != nil {
t.Fatalf("Failed to create telemetry metadata store: %v", err)
}
rows := cmock.NewRows([]cmock.ColumnType{
{Name: "statement", Type: "String"},
}, [][]any{{"CREATE TABLE signoz_traces.signoz_index_v3"}})
mock.
ExpectSelect("SHOW CREATE TABLE signoz_traces.distributed_signoz_index_v3").
WillReturnRows(rows)
query := `SELECT.*`
mock.ExpectQuery(query).
WithArgs("%http.method%", telemetrytypes.FieldContextSpan.TagType(), telemetrytypes.FieldDataTypeString.TagDataType(), 10).
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
{Name: "tag_key", Type: "String"},
{Name: "tag_type", Type: "String"},
{Name: "tag_data_type", Type: "String"},
{Name: "priority", Type: "UInt8"},
}, [][]any{{"http.method", "tag", "String", 1}, {"http.method", "tag", "String", 1}}))
keys, err := metadata.GetKeys(context.Background(), &telemetrytypes.FieldKeySelector{
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
Name: "http.method",
Limit: 10,
})
if err != nil {
t.Fatalf("Failed to get keys: %v", err)
}
t.Logf("Keys: %v", keys)
}

View File

@@ -1,132 +0,0 @@
package telemetrymetadata
import (
"strings"
"github.com/AfterShip/clickhouse-sql-parser/parser"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// TelemetryFieldVisitor is an AST visitor for extracting telemetry fields
type TelemetryFieldVisitor struct {
parser.DefaultASTVisitor
Fields []*telemetrytypes.TelemetryFieldKey
}
func NewTelemetryFieldVisitor() *TelemetryFieldVisitor {
return &TelemetryFieldVisitor{
Fields: make([]*telemetrytypes.TelemetryFieldKey, 0),
}
}
// VisitColumnDef is called when visiting a column definition
func (v *TelemetryFieldVisitor) VisitColumnDef(expr *parser.ColumnDef) error {
// Check if this is a materialized column with DEFAULT expression
if expr.DefaultExpr == nil {
return nil
}
// Parse column name to extract context and data type
columnName := expr.Name.String()
// Remove backticks if present
columnName = strings.TrimPrefix(columnName, "`")
columnName = strings.TrimSuffix(columnName, "`")
// Parse the column name to extract components
parts := strings.Split(columnName, "_")
if len(parts) < 2 {
return nil
}
context := parts[0]
dataType := parts[1]
// Check if this is a valid telemetry column
var fieldContext telemetrytypes.FieldContext
switch context {
case "resource":
fieldContext = telemetrytypes.FieldContextResource
case "scope":
fieldContext = telemetrytypes.FieldContextScope
case "attribute":
fieldContext = telemetrytypes.FieldContextAttribute
default:
return nil // Not a telemetry column
}
// Check and convert data type
var fieldDataType telemetrytypes.FieldDataType
switch dataType {
case "string":
fieldDataType = telemetrytypes.FieldDataTypeString
case "bool":
fieldDataType = telemetrytypes.FieldDataTypeBool
case "int", "int64":
fieldDataType = telemetrytypes.FieldDataTypeFloat64
case "float", "float64":
fieldDataType = telemetrytypes.FieldDataTypeFloat64
case "number":
fieldDataType = telemetrytypes.FieldDataTypeFloat64
default:
return nil // Unknown data type
}
// Extract field name from the DEFAULT expression
// The DEFAULT expression should be something like: resources_string['k8s.cluster.name']
// We need to extract the key inside the square brackets
defaultExprStr := expr.DefaultExpr.String()
// Look for the pattern: map['key']
startIdx := strings.Index(defaultExprStr, "['")
endIdx := strings.Index(defaultExprStr, "']")
if startIdx == -1 || endIdx == -1 || startIdx+2 >= endIdx {
return nil // Invalid DEFAULT expression format
}
fieldName := defaultExprStr[startIdx+2 : endIdx]
// Create and store the TelemetryFieldKey
field := telemetrytypes.TelemetryFieldKey{
Name: fieldName,
FieldContext: fieldContext,
FieldDataType: fieldDataType,
Materialized: true,
}
v.Fields = append(v.Fields, &field)
return nil
}
func ExtractFieldKeysFromTblStatement(statement string) ([]*telemetrytypes.TelemetryFieldKey, error) {
// Parse the CREATE TABLE statement using the ClickHouse parser
p := parser.NewParser(statement)
stmts, err := p.ParseStmts()
if err != nil {
return nil, err
}
// Create a visitor to collect telemetry fields
visitor := NewTelemetryFieldVisitor()
// Visit each statement
for _, stmt := range stmts {
// We're looking for CreateTable statements
createTable, ok := stmt.(*parser.CreateTable)
if !ok {
continue
}
// Visit the table schema to extract column definitions
if createTable.TableSchema != nil {
for _, column := range createTable.TableSchema.Columns {
if err := column.Accept(visitor); err != nil {
return nil, err
}
}
}
}
return visitor.Fields, nil
}

View File

@@ -1,148 +0,0 @@
package telemetrymetadata
import (
"slices"
"testing"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
func TestExtractFieldKeysFromTblStatement(t *testing.T) {
var statement = `CREATE TABLE signoz_logs.logs_v2
(
` + "`ts_bucket_start`" + ` UInt64 CODEC(DoubleDelta, LZ4),
` + "`resource_fingerprint`" + ` String CODEC(ZSTD(1)),
` + "`timestamp`" + ` UInt64 CODEC(DoubleDelta, LZ4),
` + "`observed_timestamp`" + ` UInt64 CODEC(DoubleDelta, LZ4),
` + "`id`" + ` String CODEC(ZSTD(1)),
` + "`trace_id`" + ` String CODEC(ZSTD(1)),
` + "`span_id`" + ` String CODEC(ZSTD(1)),
` + "`trace_flags`" + ` UInt32,
` + "`severity_text`" + ` LowCardinality(String) CODEC(ZSTD(1)),
` + "`severity_number`" + ` UInt8,
` + "`body`" + ` String CODEC(ZSTD(2)),
` + "`attributes_string`" + ` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
` + "`attributes_number`" + ` Map(LowCardinality(String), Float64) CODEC(ZSTD(1)),
` + "`attributes_bool`" + ` Map(LowCardinality(String), Bool) CODEC(ZSTD(1)),
` + "`resources_string`" + ` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
` + "`scope_name`" + ` String CODEC(ZSTD(1)),
` + "`scope_version`" + ` String CODEC(ZSTD(1)),
` + "`scope_string`" + ` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
` + "`attribute_number_input_size`" + ` Int64 DEFAULT attributes_number['input_size'] CODEC(ZSTD(1)),
` + "`attribute_number_input_size_exists`" + ` Bool DEFAULT if(mapContains(attributes_number, 'input_size') != 0, true, false) CODEC(ZSTD(1)),
` + "`attribute_string_log$$iostream`" + ` String DEFAULT attributes_string['log.iostream'] CODEC(ZSTD(1)),
` + "`attribute_string_log$$iostream_exists`" + ` Bool DEFAULT if(mapContains(attributes_string, 'log.iostream') != 0, true, false) CODEC(ZSTD(1)),
` + "`attribute_string_log$$file$$path`" + ` String DEFAULT attributes_string['log.file.path'] CODEC(ZSTD(1)),
` + "`attribute_string_log$$file$$path_exists`" + ` Bool DEFAULT if(mapContains(attributes_string, 'log.file.path') != 0, true, false) CODEC(ZSTD(1)),
` + "`resource_string_k8s$$cluster$$name`" + ` String DEFAULT resources_string['k8s.cluster.name'] CODEC(ZSTD(1)),
` + "`resource_string_k8s$$cluster$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.cluster.name') != 0, true, false) CODEC(ZSTD(1)),
` + "`resource_string_k8s$$namespace$$name`" + ` String DEFAULT resources_string['k8s.namespace.name'] CODEC(ZSTD(1)),
` + "`resource_string_k8s$$namespace$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.namespace.name') != 0, true, false) CODEC(ZSTD(1)),
` + "`resource_string_k8s$$pod$$name`" + ` String DEFAULT resources_string['k8s.pod.name'] CODEC(ZSTD(1)),
` + "`resource_string_k8s$$pod$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.pod.name') != 0, true, false) CODEC(ZSTD(1)),
` + "`resource_string_k8s$$node$$name`" + ` String DEFAULT resources_string['k8s.node.name'] CODEC(ZSTD(1)),
` + "`resource_string_k8s$$node$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.node.name') != 0, true, false) CODEC(ZSTD(1)),
` + "`resource_string_k8s$$container$$name`" + ` String DEFAULT resources_string['k8s.container.name'] CODEC(ZSTD(1)),
` + "`resource_string_k8s$$container$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.container.name') != 0, true, false) CODEC(ZSTD(1)),
` + "`resource_string_k8s$$deployment$$name`" + ` String DEFAULT resources_string['k8s.deployment.name'] CODEC(ZSTD(1)),
` + "`resource_string_k8s$$deployment$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.deployment.name') != 0, true, false) CODEC(ZSTD(1)),
` + "`attribute_string_processor`" + ` String DEFAULT attributes_string['processor'] CODEC(ZSTD(1)),
` + "`attribute_string_processor_exists`" + ` Bool DEFAULT if(mapContains(attributes_string, 'processor') != 0, true, false) CODEC(ZSTD(1)),
INDEX body_idx lower(body) TYPE ngrambf_v1(4, 60000, 5, 0) GRANULARITY 1,
INDEX id_minmax id TYPE minmax GRANULARITY 1,
INDEX severity_number_idx severity_number TYPE set(25) GRANULARITY 4,
INDEX severity_text_idx severity_text TYPE set(25) GRANULARITY 4,
INDEX trace_flags_idx trace_flags TYPE bloom_filter GRANULARITY 4,
INDEX scope_name_idx scope_name TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
INDEX ` + "`resource_string_k8s$$cluster$$name_idx`" + ` ` + "`resource_string_k8s$$cluster$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX ` + "`resource_string_k8s$$namespace$$name_idx`" + ` ` + "`resource_string_k8s$$namespace$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX ` + "`resource_string_k8s$$pod$$name_idx`" + ` ` + "`resource_string_k8s$$pod$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX ` + "`resource_string_k8s$$node$$name_idx`" + ` ` + "`resource_string_k8s$$node$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX ` + "`resource_string_k8s$$container$$name_idx`" + ` ` + "`resource_string_k8s$$container$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX ` + "`resource_string_k8s$$deployment$$name_idx`" + ` ` + "`resource_string_k8s$$deployment$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX attribute_string_processor_idx attribute_string_processor TYPE bloom_filter(0.01) GRANULARITY 64
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
PARTITION BY toDate(timestamp / 1000000000)
ORDER BY (ts_bucket_start, resource_fingerprint, severity_text, timestamp, id)
TTL toDateTime(timestamp / 1000000000) + toIntervalSecond(2592000)
SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192`
keys, err := ExtractFieldKeysFromTblStatement(statement)
if err != nil {
t.Fatalf("failed to extract field keys from tbl statement: %v", err)
}
// some expected keys
expectedKeys := []*telemetrytypes.TelemetryFieldKey{
{
Name: "k8s.pod.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
{
Name: "k8s.cluster.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
{
Name: "k8s.namespace.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
{
Name: "k8s.deployment.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
{
Name: "k8s.node.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
{
Name: "k8s.container.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
{
Name: "processor",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
{
Name: "input_size",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeFloat64,
Materialized: true,
},
{
Name: "log.iostream",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
{
Name: "log.file.path",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
}
for _, key := range expectedKeys {
if !slices.ContainsFunc(keys, func(k *telemetrytypes.TelemetryFieldKey) bool {
return k.Name == key.Name && k.FieldContext == key.FieldContext && k.FieldDataType == key.FieldDataType && k.Materialized == key.Materialized
}) {
t.Errorf("expected key %v not found", key)
}
}
}

View File

@@ -1,7 +0,0 @@
package telemetrymetadata
const (
DBName = "signoz_metadata"
AttributesMetadataTableName = "distributed_attributes_metadata"
AttributesMetadataLocalTableName = "attributes_metadata"
)

View File

@@ -1,21 +0,0 @@
package telemetrymetrics
const (
DBName = "signoz_metrics"
SamplesV4TableName = "distributed_samples_v4"
SamplesV4LocalTableName = "samples_v4"
SamplesV4Agg5mTableName = "distributed_samples_v4_agg_5m"
SamplesV4Agg5mLocalTableName = "samples_v4_agg_5m"
SamplesV4Agg30mTableName = "distributed_samples_v4_agg_30m"
SamplesV4Agg30mLocalTableName = "samples_v4_agg_30m"
ExpHistogramTableName = "distributed_exp_hist"
ExpHistogramLocalTableName = "exp_hist"
TimeseriesV4TableName = "distributed_time_series_v4"
TimeseriesV4LocalTableName = "time_series_v4"
TimeseriesV46hrsTableName = "distributed_time_series_v4_6hrs"
TimeseriesV46hrsLocalTableName = "time_series_v4_6hrs"
TimeseriesV41dayTableName = "distributed_time_series_v4_1day"
TimeseriesV41dayLocalTableName = "time_series_v4_1day"
TimeseriesV41weekTableName = "distributed_time_series_v4_1week"
TimeseriesV41weekLocalTableName = "time_series_v4_1week"
)

View File

@@ -1,352 +0,0 @@
package telemetrytraces
import (
"context"
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
var (
indexV3Columns = map[string]*schema.Column{
"ts_bucket_start": {Name: "ts_bucket_start", Type: schema.ColumnTypeUInt64},
"resource_fingerprint": {Name: "resource_fingerprint", Type: schema.ColumnTypeString},
// intrinsic columns
"timestamp": {Name: "timestamp", Type: schema.DateTime64ColumnType{Precision: 9, Timezone: "UTC"}},
"trace_id": {Name: "trace_id", Type: schema.FixedStringColumnType{Length: 32}},
"span_id": {Name: "span_id", Type: schema.ColumnTypeString},
"trace_state": {Name: "trace_state", Type: schema.ColumnTypeString},
"parent_span_id": {Name: "parent_span_id", Type: schema.ColumnTypeString},
"flags": {Name: "flags", Type: schema.ColumnTypeUInt32},
"name": {Name: "name", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"kind": {Name: "kind", Type: schema.ColumnTypeInt8},
"kind_string": {Name: "kind_string", Type: schema.ColumnTypeString},
"duration_nano": {Name: "duration_nano", Type: schema.ColumnTypeUInt64},
"status_code": {Name: "status_code", Type: schema.ColumnTypeInt16},
"status_message": {Name: "status_message", Type: schema.ColumnTypeString},
"status_code_string": {Name: "status_code_string", Type: schema.ColumnTypeString},
// attributes columns
"attributes_string": {Name: "attributes_string", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}},
"attributes_number": {Name: "attributes_number", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeFloat64,
}},
"attributes_bool": {Name: "attributes_bool", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeBool,
}},
"resources_string": {Name: "resources_string", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}},
"events": {Name: "events", Type: schema.ArrayColumnType{
ElementType: schema.ColumnTypeString,
}},
"links": {Name: "links", Type: schema.ColumnTypeString},
// derived columns
"response_status_code": {Name: "response_status_code", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"external_http_url": {Name: "external_http_url", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"http_url": {Name: "http_url", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"external_http_method": {Name: "external_http_method", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"http_method": {Name: "http_method", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"http_host": {Name: "http_host", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"db_name": {Name: "db_name", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"db_operation": {Name: "db_operation", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"has_error": {Name: "has_error", Type: schema.ColumnTypeBool},
"is_remote": {Name: "is_remote", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
// materialized columns
"resource_string_service$$name": {Name: "resource_string_service$$name", Type: schema.ColumnTypeString},
"attribute_string_http$$route": {Name: "attribute_string_http$$route", Type: schema.ColumnTypeString},
"attribute_string_messaging$$system": {Name: "attribute_string_messaging$$system", Type: schema.ColumnTypeString},
"attribute_string_messaging$$operation": {Name: "attribute_string_messaging$$operation", Type: schema.ColumnTypeString},
"attribute_string_db$$system": {Name: "attribute_string_db$$system", Type: schema.ColumnTypeString},
"attribute_string_rpc$$system": {Name: "attribute_string_rpc$$system", Type: schema.ColumnTypeString},
"attribute_string_rpc$$service": {Name: "attribute_string_rpc$$service", Type: schema.ColumnTypeString},
"attribute_string_rpc$$method": {Name: "attribute_string_rpc$$method", Type: schema.ColumnTypeString},
"attribute_string_peer$$service": {Name: "attribute_string_peer$$service", Type: schema.ColumnTypeString},
// deprecated intrinsic columns
"traceID": {Name: "traceID", Type: schema.FixedStringColumnType{Length: 32}},
"spanID": {Name: "spanID", Type: schema.ColumnTypeString},
"parentSpanID": {Name: "parentSpanID", Type: schema.ColumnTypeString},
"spanKind": {Name: "spanKind", Type: schema.ColumnTypeString},
"durationNano": {Name: "durationNano", Type: schema.ColumnTypeUInt64},
"statusCode": {Name: "statusCode", Type: schema.ColumnTypeInt16},
"statusMessage": {Name: "statusMessage", Type: schema.ColumnTypeString},
"statusCodeString": {Name: "statusCodeString", Type: schema.ColumnTypeString},
// deprecated derived columns
"references": {Name: "references", Type: schema.ColumnTypeString},
"responseStatusCode": {Name: "responseStatusCode", Type: schema.ColumnTypeString},
"externalHttpUrl": {Name: "externalHttpUrl", Type: schema.ColumnTypeString},
"httpUrl": {Name: "httpUrl", Type: schema.ColumnTypeString},
"externalHttpMethod": {Name: "externalHttpMethod", Type: schema.ColumnTypeString},
"httpMethod": {Name: "httpMethod", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"httpHost": {Name: "httpHost", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"dbName": {Name: "dbName", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"dbOperation": {Name: "dbOperation", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"hasError": {Name: "hasError", Type: schema.ColumnTypeBool},
"isRemote": {Name: "isRemote", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"serviceName": {Name: "serviceName", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"httpRoute": {Name: "httpRoute", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"msgSystem": {Name: "msgSystem", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"msgOperation": {Name: "msgOperation", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"dbSystem": {Name: "dbSystem", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"rpcSystem": {Name: "rpcSystem", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"rpcService": {Name: "rpcService", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"rpcMethod": {Name: "rpcMethod", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"peerService": {Name: "peerService", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
// materialized exists columns
"resource_string_service$$name_exists": {Name: "resource_string_service$$name_exists", Type: schema.ColumnTypeBool},
"attribute_string_http$$route_exists": {Name: "attribute_string_http$$route_exists", Type: schema.ColumnTypeBool},
"attribute_string_messaging$$system_exists": {Name: "attribute_string_messaging$$system_exists", Type: schema.ColumnTypeBool},
"attribute_string_messaging$$operation_exists": {Name: "attribute_string_messaging$$operation_exists", Type: schema.ColumnTypeBool},
"attribute_string_db$$system_exists": {Name: "attribute_string_db$$system_exists", Type: schema.ColumnTypeBool},
"attribute_string_rpc$$system_exists": {Name: "attribute_string_rpc$$system_exists", Type: schema.ColumnTypeBool},
"attribute_string_rpc$$service_exists": {Name: "attribute_string_rpc$$service_exists", Type: schema.ColumnTypeBool},
"attribute_string_rpc$$method_exists": {Name: "attribute_string_rpc$$method_exists", Type: schema.ColumnTypeBool},
"attribute_string_peer$$service_exists": {Name: "attribute_string_peer$$service_exists", Type: schema.ColumnTypeBool},
}
)
// interface check
var _ qbtypes.ConditionBuilder = &conditionBuilder{}
type conditionBuilder struct {
}
func NewConditionBuilder() qbtypes.ConditionBuilder {
return &conditionBuilder{}
}
func (c *conditionBuilder) GetColumn(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
return indexV3Columns["resources_string"], nil
case telemetrytypes.FieldContextScope:
// we don't have scope data stored in the spans yet
return nil, qbtypes.ErrColumnNotFound
case telemetrytypes.FieldContextAttribute:
switch key.FieldDataType {
case telemetrytypes.FieldDataTypeString:
return indexV3Columns["attributes_string"], nil
case telemetrytypes.FieldDataTypeInt64, telemetrytypes.FieldDataTypeFloat64, telemetrytypes.FieldDataTypeNumber:
return indexV3Columns["attributes_number"], nil
case telemetrytypes.FieldDataTypeBool:
return indexV3Columns["attributes_bool"], nil
}
case telemetrytypes.FieldContextSpan:
col, ok := indexV3Columns[key.Name]
if !ok {
return nil, qbtypes.ErrColumnNotFound
}
return col, nil
}
return nil, qbtypes.ErrColumnNotFound
}
func (c *conditionBuilder) GetTableFieldName(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
column, err := c.GetColumn(ctx, key)
if err != nil {
return "", err
}
switch column.Type {
case schema.ColumnTypeString,
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
schema.ColumnTypeUInt64,
schema.ColumnTypeUInt32,
schema.ColumnTypeInt8,
schema.ColumnTypeInt16,
schema.ColumnTypeBool,
schema.DateTime64ColumnType{Precision: 9, Timezone: "UTC"},
schema.FixedStringColumnType{Length: 32}:
return column.Name, nil
case schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}:
// a key could have been materialized, if so return the materialized column name
if key.Materialized {
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
}
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
case schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeFloat64,
}:
// a key could have been materialized, if so return the materialized column name
if key.Materialized {
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
}
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
case schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeBool,
}:
// a key could have been materialized, if so return the materialized column name
if key.Materialized {
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
}
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
}
// should not reach here
return column.Name, nil
}
func (c *conditionBuilder) GetCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
column, err := c.GetColumn(ctx, key)
if err != nil {
return "", err
}
tblFieldName, err := c.GetTableFieldName(ctx, key)
if err != nil {
return "", err
}
tblFieldName, value = telemetrytypes.DataTypeCollisionHandledFieldName(key, value, tblFieldName)
// regular operators
switch operator {
// regular operators
case qbtypes.FilterOperatorEqual:
return sb.E(tblFieldName, value), nil
case qbtypes.FilterOperatorNotEqual:
return sb.NE(tblFieldName, value), nil
case qbtypes.FilterOperatorGreaterThan:
return sb.G(tblFieldName, value), nil
case qbtypes.FilterOperatorGreaterThanOrEq:
return sb.GE(tblFieldName, value), nil
case qbtypes.FilterOperatorLessThan:
return sb.LT(tblFieldName, value), nil
case qbtypes.FilterOperatorLessThanOrEq:
return sb.LE(tblFieldName, value), nil
// like and not like
case qbtypes.FilterOperatorLike:
return sb.Like(tblFieldName, value), nil
case qbtypes.FilterOperatorNotLike:
return sb.NotLike(tblFieldName, value), nil
case qbtypes.FilterOperatorILike:
return sb.ILike(tblFieldName, value), nil
case qbtypes.FilterOperatorNotILike:
return sb.NotILike(tblFieldName, value), nil
case qbtypes.FilterOperatorContains:
return sb.ILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorRegexp:
exp := fmt.Sprintf(`match(%s, %s)`, tblFieldName, sb.Var(value))
return sb.And(exp), nil
case qbtypes.FilterOperatorNotRegexp:
exp := fmt.Sprintf(`not match(%s, %s)`, tblFieldName, sb.Var(value))
return sb.And(exp), nil
// between and not between
case qbtypes.FilterOperatorBetween:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrBetweenValues
}
if len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.Between(tblFieldName, values[0], values[1]), nil
case qbtypes.FilterOperatorNotBetween:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrBetweenValues
}
if len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.NotBetween(tblFieldName, values[0], values[1]), nil
// in and not in
case qbtypes.FilterOperatorIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
return sb.In(tblFieldName, values...), nil
case qbtypes.FilterOperatorNotIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
return sb.NotIn(tblFieldName, values...), nil
// exists and not exists
// in the query builder, `exists` and `not exists` are used for
// key membership checks, so depending on the column type, the condition changes
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
var value any
switch column.Type {
case schema.ColumnTypeString,
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
schema.FixedStringColumnType{Length: 32},
schema.DateTime64ColumnType{Precision: 9, Timezone: "UTC"}:
value = ""
if operator == qbtypes.FilterOperatorExists {
return sb.NE(tblFieldName, value), nil
} else {
return sb.E(tblFieldName, value), nil
}
case schema.ColumnTypeUInt64,
schema.ColumnTypeUInt32,
schema.ColumnTypeUInt8,
schema.ColumnTypeInt8,
schema.ColumnTypeInt16,
schema.ColumnTypeBool:
value = 0
if operator == qbtypes.FilterOperatorExists {
return sb.NE(tblFieldName, value), nil
} else {
return sb.E(tblFieldName, value), nil
}
case schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}, schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeBool,
}, schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeFloat64,
}:
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
if key.Materialized {
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
}
if operator == qbtypes.FilterOperatorExists {
return sb.E(leftOperand, true), nil
} else {
return sb.NE(leftOperand, true), nil
}
default:
return "", fmt.Errorf("exists operator is not supported for column type %s", column.Type)
}
}
return "", nil
}

View File

@@ -1,298 +0,0 @@
package telemetrytraces
import (
"context"
"testing"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetFieldKeyName(t *testing.T) {
ctx := context.Background()
conditionBuilder := &conditionBuilder{}
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
expectedResult string
expectedError error
}{
{
name: "Simple column type - timestamp",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextSpan,
},
expectedResult: "timestamp",
expectedError: nil,
},
{
name: "Map column type - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
expectedResult: "attributes_string['user.id']",
expectedError: nil,
},
{
name: "Map column type - number attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.size",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
expectedResult: "attributes_number['request.size']",
expectedError: nil,
},
{
name: "Map column type - bool attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.success",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
expectedResult: "attributes_bool['request.success']",
expectedError: nil,
},
{
name: "Map column type - resource attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
},
expectedResult: "resources_string['service.name']",
expectedError: nil,
},
{
name: "Non-existent column",
key: telemetrytypes.TelemetryFieldKey{
Name: "nonexistent_field",
FieldContext: telemetrytypes.FieldContextSpan,
},
expectedResult: "",
expectedError: qbtypes.ErrColumnNotFound,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := conditionBuilder.GetTableFieldName(ctx, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
} else {
require.NoError(t, err)
assert.Equal(t, tc.expectedResult, result)
}
})
}
}
func TestGetCondition(t *testing.T) {
ctx := context.Background()
conditionBuilder := NewConditionBuilder()
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
operator qbtypes.FilterOperator
value any
expectedSQL string
expectedError error
}{
{
name: "Not Equal operator - timestamp",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextSpan,
},
operator: qbtypes.FilterOperatorNotEqual,
value: uint64(1617979338000000000),
expectedSQL: "timestamp <> ?",
expectedError: nil,
},
{
name: "Greater Than operator - number attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.duration",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
operator: qbtypes.FilterOperatorGreaterThan,
value: float64(100),
expectedSQL: "attributes_number['request.duration'] > ?",
expectedError: nil,
},
{
name: "Less Than operator - number attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.size",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
operator: qbtypes.FilterOperatorLessThan,
value: float64(1024),
expectedSQL: "attributes_number['request.size'] < ?",
expectedError: nil,
},
{
name: "Greater Than Or Equal operator - timestamp",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextSpan,
},
operator: qbtypes.FilterOperatorGreaterThanOrEq,
value: uint64(1617979338000000000),
expectedSQL: "timestamp >= ?",
expectedError: nil,
},
{
name: "Less Than Or Equal operator - timestamp",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextSpan,
},
operator: qbtypes.FilterOperatorLessThanOrEq,
value: uint64(1617979338000000000),
expectedSQL: "timestamp <= ?",
expectedError: nil,
},
{
name: "ILike operator - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorILike,
value: "%admin%",
expectedSQL: "WHERE LOWER(attributes_string['user.id']) LIKE LOWER(?)",
expectedError: nil,
},
{
name: "Not ILike operator - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorNotILike,
value: "%admin%",
expectedSQL: "WHERE LOWER(attributes_string['user.id']) NOT LIKE LOWER(?)",
expectedError: nil,
},
{
name: "Between operator - timestamp",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextSpan,
},
operator: qbtypes.FilterOperatorBetween,
value: []any{uint64(1617979338000000000), uint64(1617979348000000000)},
expectedSQL: "timestamp BETWEEN ? AND ?",
expectedError: nil,
},
{
name: "Between operator - invalid value",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextSpan,
},
operator: qbtypes.FilterOperatorBetween,
value: "invalid",
expectedSQL: "",
expectedError: qbtypes.ErrBetweenValues,
},
{
name: "Between operator - insufficient values",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextSpan,
},
operator: qbtypes.FilterOperatorBetween,
value: []any{uint64(1617979338000000000)},
expectedSQL: "",
expectedError: qbtypes.ErrBetweenValues,
},
{
name: "Not Between operator - timestamp",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextSpan,
},
operator: qbtypes.FilterOperatorNotBetween,
value: []any{uint64(1617979338000000000), uint64(1617979348000000000)},
expectedSQL: "timestamp NOT BETWEEN ? AND ?",
expectedError: nil,
},
{
name: "Exists operator - map field",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorExists,
value: nil,
expectedSQL: "mapContains(attributes_string, 'user.id') = ?",
expectedError: nil,
},
{
name: "Not Exists operator - map field",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorNotExists,
value: nil,
expectedSQL: "mapContains(attributes_string, 'user.id') <> ?",
expectedError: nil,
},
{
name: "Contains operator - map field",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
operator: qbtypes.FilterOperatorContains,
value: "admin",
expectedSQL: "WHERE LOWER(attributes_string['user.id']) LIKE LOWER(?)",
expectedError: nil,
},
{
name: "Non-existent column",
key: telemetrytypes.TelemetryFieldKey{
Name: "nonexistent_field",
FieldContext: telemetrytypes.FieldContextSpan,
},
operator: qbtypes.FilterOperatorEqual,
value: "value",
expectedSQL: "",
expectedError: qbtypes.ErrColumnNotFound,
},
}
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
t.Run(tc.name, func(t *testing.T) {
cond, err := conditionBuilder.GetCondition(ctx, &tc.key, tc.operator, tc.value, sb)
sb.Where(cond)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
} else {
require.NoError(t, err)
sql, _ := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
assert.Contains(t, sql, tc.expectedSQL)
}
})
}
}

View File

@@ -1,10 +0,0 @@
package telemetrytraces
const (
DBName = "signoz_traces"
SpanIndexV3TableName = "distributed_signoz_index_v3"
SpanIndexV3LocalTableName = "signoz_index_v3"
TagAttributesV2TableName = "distributed_tag_attributes_v2"
TagAttributesV2LocalTableName = "tag_attributes_v2"
TopLevelOperationsTableName = "distributed_top_level_operations"
)