Compare commits

...

4 Commits

Author SHA1 Message Date
Piyush Singariya
c9f5b16c37 chore: restructuring looks ok 2025-04-21 17:39:19 +05:30
Piyush Singariya
55837dacb5 chore: in progress 2025-04-18 17:37:50 +05:30
Piyush Singariya
14248968cb chore: trying restructuring 2025-04-18 16:26:32 +05:30
Piyush Singariya
a436ae900c feat: introducing S3 sync as AWS integration 2025-04-16 18:45:36 +05:30
41 changed files with 308 additions and 266 deletions

View File

@@ -8,6 +8,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
@@ -27,7 +28,7 @@ func validateCloudProviderName(name string) *model.ApiError {
type Controller struct { type Controller struct {
accountsRepo cloudProviderAccountsRepository accountsRepo cloudProviderAccountsRepository
serviceConfigRepo serviceConfigRepository serviceConfigRepo ServiceConfigDatabase
} }
func NewController(sqlStore sqlstore.SQLStore) ( func NewController(sqlStore sqlstore.SQLStore) (
@@ -200,7 +201,7 @@ type AgentCheckInResponse struct {
type IntegrationConfigForAgent struct { type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"` EnabledRegions []string `json:"enabled_regions"`
TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"` TelemetryCollectionStrategy *CompiledCollectionStrategy `json:"telemetry,omitempty"`
} }
func (c *Controller) CheckInAsAgent( func (c *Controller) CheckInAsAgent(
@@ -239,7 +240,7 @@ func (c *Controller) CheckInAsAgent(
} }
// prepare and return integration config to be consumed by agent // prepare and return integration config to be consumed by agent
telemetryCollectionStrategy, err := NewCloudTelemetryCollectionStrategy(cloudProvider) compliedStrategy, err := NewCompiledCollectionStrategy(cloudProvider)
if err != nil { if err != nil {
return nil, model.InternalError(fmt.Errorf( return nil, model.InternalError(fmt.Errorf(
"couldn't init telemetry collection strategy: %w", err, "couldn't init telemetry collection strategy: %w", err,
@@ -248,21 +249,17 @@ func (c *Controller) CheckInAsAgent(
agentConfig := IntegrationConfigForAgent{ agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{}, EnabledRegions: []string{},
TelemetryCollectionStrategy: telemetryCollectionStrategy, TelemetryCollectionStrategy: compliedStrategy,
} }
if account.Config != nil && account.Config.EnabledRegions != nil { if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions agentConfig.EnabledRegions = account.Config.EnabledRegions
} }
services, apiErr := listCloudProviderServices(cloudProvider) services, apiErr := services.Map(cloudProvider)
if apiErr != nil { if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services") return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
} }
svcDetailsById := map[string]*CloudServiceDetails{}
for _, svcDetails := range services {
svcDetailsById[svcDetails.Id] = &svcDetails
}
svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount( svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, *account.CloudAccountId, ctx, cloudProvider, *account.CloudAccountId,
@@ -278,22 +275,16 @@ func (c *Controller) CheckInAsAgent(
slices.Sort(configuredSvcIds) slices.Sort(configuredSvcIds)
for _, svcId := range configuredSvcIds { for _, svcId := range configuredSvcIds {
svcDetails := svcDetailsById[svcId] definition, ok := services[svcId]
svcConfig := svcConfigs[svcId] if !ok {
continue
}
config := svcConfigs[svcId]
if svcDetails != nil { err := AddServiceStrategy(compliedStrategy, definition.Strategy, config)
metricsEnabled := svcConfig.Metrics != nil && svcConfig.Metrics.Enabled if err != nil {
logsEnabled := svcConfig.Logs != nil && svcConfig.Logs.Enabled return nil, model.InternalError(
if logsEnabled || metricsEnabled { fmt.Errorf("couldn't add service telemetry collection strategy: %s", err))
err := agentConfig.TelemetryCollectionStrategy.AddServiceStrategy(
svcDetails.TelemetryCollectionStrategy, logsEnabled, metricsEnabled,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't add service telemetry collection strategy: %w", err,
))
}
}
} }
} }
@@ -355,7 +346,7 @@ func (c *Controller) DisconnectAccount(
} }
type ListServicesResponse struct { type ListServicesResponse struct {
Services []CloudServiceSummary `json:"services"` Services []ServiceSummary `json:"services"`
} }
func (c *Controller) ListServices( func (c *Controller) ListServices(
@@ -363,17 +354,16 @@ func (c *Controller) ListServices(
cloudProvider string, cloudProvider string,
cloudAccountId *string, cloudAccountId *string,
) (*ListServicesResponse, *model.ApiError) { ) (*ListServicesResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr return nil, apiErr
} }
services, apiErr := listCloudProviderServices(cloudProvider) definitions, apiErr := services.List(cloudProvider)
if apiErr != nil { if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services") return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
} }
svcConfigs := map[string]*CloudServiceConfig{} svcConfigs := map[string]*ServiceConfig{}
if cloudAccountId != nil { if cloudAccountId != nil {
svcConfigs, apiErr = c.serviceConfigRepo.getAllForAccount( svcConfigs, apiErr = c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, *cloudAccountId, ctx, cloudProvider, *cloudAccountId,
@@ -385,9 +375,11 @@ func (c *Controller) ListServices(
} }
} }
summaries := []CloudServiceSummary{} summaries := []ServiceSummary{}
for _, s := range services { for _, def := range definitions {
summary := s.CloudServiceSummary summary := ServiceSummary{
Metadata: def.Metadata,
}
summary.Config = svcConfigs[summary.Id] summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary) summaries = append(summaries, summary)
@@ -403,17 +395,21 @@ func (c *Controller) GetServiceDetails(
cloudProvider string, cloudProvider string,
serviceId string, serviceId string,
cloudAccountId *string, cloudAccountId *string,
) (*CloudServiceDetails, *model.ApiError) { ) (*ServiceDetails, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr return nil, apiErr
} }
service, apiErr := getCloudProviderService(cloudProvider, serviceId) definition, apiErr := services.GetServiceDefinition(cloudProvider, serviceId)
if apiErr != nil { if apiErr != nil {
return nil, apiErr return nil, apiErr
} }
details := ServiceDetails{
Definition: *definition,
}
if cloudAccountId != nil { if cloudAccountId != nil {
config, apiErr := c.serviceConfigRepo.get( config, apiErr := c.serviceConfigRepo.get(
ctx, cloudProvider, *cloudAccountId, serviceId, ctx, cloudProvider, *cloudAccountId, serviceId,
@@ -423,15 +419,15 @@ func (c *Controller) GetServiceDetails(
} }
if config != nil { if config != nil {
service.Config = config details.Config = config
if config.Metrics != nil && config.Metrics.Enabled { if config.Metrics != nil && config.Metrics.Enabled {
// add links to service dashboards, making them clickable. // add links to service dashboards, making them clickable.
for i, d := range service.Assets.Dashboards { for i, d := range details.Assets.Dashboards {
dashboardUuid := c.dashboardUuid( dashboardUuid := c.dashboardUuid(
cloudProvider, serviceId, d.Id, cloudProvider, serviceId, d.Id,
) )
service.Assets.Dashboards[i].Url = fmt.Sprintf( details.Assets.Dashboards[i].Url = fmt.Sprintf(
"/dashboard/%s", dashboardUuid, "/dashboard/%s", dashboardUuid,
) )
} }
@@ -439,17 +435,17 @@ func (c *Controller) GetServiceDetails(
} }
} }
return service, nil return &details, nil
} }
type UpdateServiceConfigRequest struct { type UpdateServiceConfigRequest struct {
CloudAccountId string `json:"cloud_account_id"` CloudAccountId string `json:"cloud_account_id"`
Config CloudServiceConfig `json:"config"` Config ServiceConfig `json:"config"`
} }
type UpdateServiceConfigResponse struct { type UpdateServiceConfigResponse struct {
Id string `json:"id"` Id string `json:"id"`
Config CloudServiceConfig `json:"config"` Config ServiceConfig `json:"config"`
} }
func (c *Controller) UpdateServiceConfig( func (c *Controller) UpdateServiceConfig(
@@ -458,7 +454,6 @@ func (c *Controller) UpdateServiceConfig(
serviceId string, serviceId string,
req UpdateServiceConfigRequest, req UpdateServiceConfigRequest,
) (*UpdateServiceConfigResponse, *model.ApiError) { ) (*UpdateServiceConfigResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr return nil, apiErr
} }
@@ -472,7 +467,7 @@ func (c *Controller) UpdateServiceConfig(
} }
// can only update config for a valid service. // can only update config for a valid service.
_, apiErr = getCloudProviderService(cloudProvider, serviceId) _, apiErr = services.GetServiceDefinition(cloudProvider, serviceId)
if apiErr != nil { if apiErr != nil {
return nil, model.WrapApiError(apiErr, "unsupported service") return nil, model.WrapApiError(apiErr, "unsupported service")
} }
@@ -540,7 +535,7 @@ func (c *Controller) AvailableDashboardsForCloudProvider(
} }
} }
allServices, apiErr := listCloudProviderServices(cloudProvider) allServices, apiErr := services.List(cloudProvider)
if apiErr != nil { if apiErr != nil {
return nil, apiErr return nil, apiErr
} }

View File

@@ -182,8 +182,8 @@ func TestConfigureService(t *testing.T) {
require.NotNil(testConnectedAccount.CloudAccountId) require.NotNil(testConnectedAccount.CloudAccountId)
require.Equal(testCloudAccountId, *testConnectedAccount.CloudAccountId) require.Equal(testCloudAccountId, *testConnectedAccount.CloudAccountId)
testSvcConfig := CloudServiceConfig{ testSvcConfig := ServiceConfig{
Metrics: &CloudServiceMetricsConfig{ Metrics: &MetricsConfig{
Enabled: true, Enabled: true,
}, },
} }

View File

@@ -6,9 +6,22 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
) )
type ServiceSummary struct {
services.Metadata
Config *ServiceConfig `json:"config"`
}
type ServiceDetails struct {
services.Definition
Config *ServiceConfig `json:"config"`
ConnectionStatus *ServiceConnectionStatus `json:"status,omitempty"`
}
// Represents a cloud provider account for cloud integrations // Represents a cloud provider account for cloud integrations
type AccountRecord struct { type AccountRecord struct {
CloudProvider string `json:"cloud_provider" db:"cloud_provider"` CloudProvider string `json:"cloud_provider" db:"cloud_provider"`
@@ -118,39 +131,13 @@ func (a *AccountRecord) account() Account {
return ca return ca
} }
type CloudServiceSummary struct { type ServiceConfig struct {
Id string `json:"id"` Logs *LogsConfig `json:"logs,omitempty"`
Title string `json:"title"` Metrics *MetricsConfig `json:"metrics,omitempty"`
Icon string `json:"icon"`
// Present only if the service has been configured in the
// context of a cloud provider account.
Config *CloudServiceConfig `json:"config,omitempty"`
} }
type CloudServiceDetails struct { // Serialization from db
CloudServiceSummary func (c *ServiceConfig) Scan(src any) error {
Overview string `json:"overview"` // markdown
Assets CloudServiceAssets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollectedForService `json:"data_collected"`
ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"`
TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry_collection_strategy"`
}
type CloudServiceConfig struct {
Logs *CloudServiceLogsConfig `json:"logs,omitempty"`
Metrics *CloudServiceMetricsConfig `json:"metrics,omitempty"`
}
// For serializing from db
func (c *CloudServiceConfig) Scan(src any) error {
data, ok := src.([]byte) data, ok := src.([]byte)
if !ok { if !ok {
return fmt.Errorf("tried to scan from %T instead of bytes", src) return fmt.Errorf("tried to scan from %T instead of bytes", src)
@@ -159,8 +146,8 @@ func (c *CloudServiceConfig) Scan(src any) error {
return json.Unmarshal(data, &c) return json.Unmarshal(data, &c)
} }
// For serializing to db // Serializing to db
func (c *CloudServiceConfig) Value() (driver.Value, error) { func (c *ServiceConfig) Value() (driver.Value, error) {
if c == nil { if c == nil {
return nil, nil return nil, nil
} }
@@ -174,51 +161,16 @@ func (c *CloudServiceConfig) Value() (driver.Value, error) {
return serialized, nil return serialized, nil
} }
type CloudServiceLogsConfig struct { type LogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type MetricsConfig struct {
Enabled bool `json:"enabled"` Enabled bool `json:"enabled"`
} }
type CloudServiceMetricsConfig struct { type ServiceConnectionStatus struct {
Enabled bool `json:"enabled"`
}
type CloudServiceAssets struct {
Dashboards []CloudServiceDashboard `json:"dashboards"`
}
type CloudServiceDashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *types.DashboardData `json:"definition,omitempty"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollectedForService struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CloudServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"` Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"` Metrics *SignalConnectionStatus `json:"metrics"`
} }
@@ -228,23 +180,14 @@ type SignalConnectionStatus struct {
LastReceivedFrom string `json:"last_received_from"` // resource identifier LastReceivedFrom string `json:"last_received_from"` // resource identifier
} }
type CloudTelemetryCollectionStrategy struct { type CompiledCollectionStrategy = services.CollectionStrategy
Provider string `json:"provider"`
AWSMetrics *AWSMetricsCollectionStrategy `json:"aws_metrics,omitempty"` func NewCompiledCollectionStrategy(provider string) (*CompiledCollectionStrategy, error) {
AWSLogs *AWSLogsCollectionStrategy `json:"aws_logs,omitempty"`
}
func NewCloudTelemetryCollectionStrategy(provider string) (*CloudTelemetryCollectionStrategy, error) {
if provider == "aws" { if provider == "aws" {
return &CloudTelemetryCollectionStrategy{ return &CompiledCollectionStrategy{
Provider: "aws", Provider: "aws",
AWSMetrics: &AWSMetricsCollectionStrategy{ AWSMetrics: &services.AWSMetricsStrategy{},
CloudwatchMetricsStreamFilters: []CloudwatchMetricStreamFilter{}, AWSLogs: &services.AWSLogsStrategy{},
},
AWSLogs: &AWSLogsCollectionStrategy{
CloudwatchLogsSubscriptions: []CloudwatchLogsSubscriptionConfig{},
},
}, nil }, nil
} }
@@ -252,24 +195,27 @@ func NewCloudTelemetryCollectionStrategy(provider string) (*CloudTelemetryCollec
} }
// Helper for accumulating strategies for enabled services. // Helper for accumulating strategies for enabled services.
func (cs *CloudTelemetryCollectionStrategy) AddServiceStrategy( func AddServiceStrategy(cs *CompiledCollectionStrategy,
svcStrategy *CloudTelemetryCollectionStrategy, definitionStrat *services.CollectionStrategy, config *ServiceConfig) error {
logsEnabled bool, if definitionStrat.Provider != cs.Provider {
metricsEnabled bool,
) error {
if svcStrategy.Provider != cs.Provider {
return fmt.Errorf( return fmt.Errorf(
"can't add %s service strategy to strategy for %s", "can't add %s service strategy to strategy for %s",
svcStrategy.Provider, cs.Provider, definitionStrat.Provider, cs.Provider,
) )
} }
if cs.Provider == "aws" { if cs.Provider == "aws" {
if logsEnabled { if config.Logs != nil && config.Logs.Enabled && definitionStrat.AWSLogs != nil {
cs.AWSLogs.AddServiceStrategy(svcStrategy.AWSLogs) cs.AWSLogs.Subscriptions = append(
cs.AWSLogs.Subscriptions,
definitionStrat.AWSLogs.Subscriptions...,
)
} }
if metricsEnabled { if config.Metrics != nil && config.Metrics.Enabled && definitionStrat.AWSMetrics != nil {
cs.AWSMetrics.AddServiceStrategy(svcStrategy.AWSMetrics) cs.AWSMetrics.StreamFilters = append(
cs.AWSMetrics.StreamFilters,
definitionStrat.AWSMetrics.StreamFilters...,
)
} }
return nil return nil
} }
@@ -277,57 +223,3 @@ func (cs *CloudTelemetryCollectionStrategy) AddServiceStrategy(
return fmt.Errorf("unsupported cloud provider: %s", cs.Provider) return fmt.Errorf("unsupported cloud provider: %s", cs.Provider)
} }
type AWSMetricsCollectionStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
CloudwatchMetricsStreamFilters []CloudwatchMetricStreamFilter `json:"cloudwatch_metric_stream_filters"`
}
type CloudwatchMetricStreamFilter struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
}
func (amc *AWSMetricsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSMetricsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}
amc.CloudwatchMetricsStreamFilters = append(
amc.CloudwatchMetricsStreamFilters,
svcStrategy.CloudwatchMetricsStreamFilters...,
)
return nil
}
type AWSLogsCollectionStrategy struct {
CloudwatchLogsSubscriptions []CloudwatchLogsSubscriptionConfig `json:"cloudwatch_logs_subscriptions"`
}
type CloudwatchLogsSubscriptionConfig struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
}
func (alc *AWSLogsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSLogsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}
alc.CloudwatchLogsSubscriptions = append(
alc.CloudwatchLogsSubscriptions,
svcStrategy.CloudwatchLogsSubscriptions...,
)
return nil
}

View File

@@ -9,28 +9,28 @@ import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
type serviceConfigRepository interface { type ServiceConfigDatabase interface {
get( get(
ctx context.Context, ctx context.Context,
cloudProvider string, cloudProvider string,
cloudAccountId string, cloudAccountId string,
serviceId string, serviceId string,
) (*CloudServiceConfig, *model.ApiError) ) (*ServiceConfig, *model.ApiError)
upsert( upsert(
ctx context.Context, ctx context.Context,
cloudProvider string, cloudProvider string,
cloudAccountId string, cloudAccountId string,
serviceId string, serviceId string,
config CloudServiceConfig, config ServiceConfig,
) (*CloudServiceConfig, *model.ApiError) ) (*ServiceConfig, *model.ApiError)
getAllForAccount( getAllForAccount(
ctx context.Context, ctx context.Context,
cloudProvider string, cloudProvider string,
cloudAccountId string, cloudAccountId string,
) ( ) (
configsBySvcId map[string]*CloudServiceConfig, configsBySvcId map[string]*ServiceConfig,
apiErr *model.ApiError, apiErr *model.ApiError,
) )
} }
@@ -52,9 +52,9 @@ func (r *serviceConfigSQLRepository) get(
cloudProvider string, cloudProvider string,
cloudAccountId string, cloudAccountId string,
serviceId string, serviceId string,
) (*CloudServiceConfig, *model.ApiError) { ) (*ServiceConfig, *model.ApiError) {
var result CloudServiceConfig var result ServiceConfig
err := r.db.GetContext( err := r.db.GetContext(
ctx, &result, ` ctx, &result, `
@@ -68,17 +68,13 @@ func (r *serviceConfigSQLRepository) get(
`, `,
cloudProvider, cloudAccountId, serviceId, cloudProvider, cloudAccountId, serviceId,
) )
if err != nil {
if err == sql.ErrNoRows {
return nil, model.NotFoundError(
fmt.Errorf("couldn't find %s %s config for %s", cloudProvider, serviceId, cloudAccountId))
}
if err == sql.ErrNoRows { return nil, model.InternalError(fmt.Errorf("couldn't query cloud service config: %w", err))
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find %s %s config for %s",
cloudProvider, serviceId, cloudAccountId,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud service config: %w", err,
))
} }
return &result, nil return &result, nil
@@ -90,8 +86,8 @@ func (r *serviceConfigSQLRepository) upsert(
cloudProvider string, cloudProvider string,
cloudAccountId string, cloudAccountId string,
serviceId string, serviceId string,
config CloudServiceConfig, config ServiceConfig,
) (*CloudServiceConfig, *model.ApiError) { ) (*ServiceConfig, *model.ApiError) {
query := ` query := `
INSERT INTO cloud_integrations_service_configs ( INSERT INTO cloud_integrations_service_configs (
@@ -128,11 +124,11 @@ func (r *serviceConfigSQLRepository) getAllForAccount(
ctx context.Context, ctx context.Context,
cloudProvider string, cloudProvider string,
cloudAccountId string, cloudAccountId string,
) (map[string]*CloudServiceConfig, *model.ApiError) { ) (map[string]*ServiceConfig, *model.ApiError) {
type ScannedServiceConfigRecord struct { type ScannedServiceConfigRecord struct {
ServiceId string `db:"service_id"` ServiceId string `db:"service_id"`
Config CloudServiceConfig `db:"config_json"` Config ServiceConfig `db:"config_json"`
} }
records := []ScannedServiceConfigRecord{} records := []ScannedServiceConfigRecord{}
@@ -155,7 +151,7 @@ func (r *serviceConfigSQLRepository) getAllForAccount(
)) ))
} }
result := map[string]*CloudServiceConfig{} result := map[string]*ServiceConfig{}
for _, r := range records { for _, r := range records {
result[r.ServiceId] = &r.Config result[r.ServiceId] = &r.Config

View File

@@ -0,0 +1 @@
<svg width="256" height="256" viewBox="0 0 256 256" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" preserveAspectRatio="xMidYMid"><title>AWS Simple Storage Service (S3)</title><defs><linearGradient x1="0%" y1="100%" x2="100%" y2="0%" id="a"><stop stop-color="#1B660F" offset="0%"/><stop stop-color="#6CAE3E" offset="100%"/></linearGradient></defs><g><rect fill="url(#a)" width="256" height="256"/><path d="M194.67488,137.25632 L195.90368,128.60352 C207.23488,135.39072 207.38208,138.19392 207.378964,138.27072 C207.35968,138.28672 205.42688,139.89952 194.67488,137.25632 L194.67488,137.25632 Z M188.45728,135.52832 C168.87328,129.60192 141.59968,117.08992 130.56288,111.87392 C130.56288,111.82912 130.57568,111.78752 130.57568,111.74272 C130.57568,107.50272 127.12608,104.05312 122.88288,104.05312 C118.64608,104.05312 115.19648,107.50272 115.19648,111.74272 C115.19648,115.98272 118.64608,119.43232 122.88288,119.43232 C124.74528,119.43232 126.43488,118.73792 127.76928,117.63392 C140.75488,123.78112 167.81728,136.11072 187.54528,141.93472 L179.74368,196.99392 C179.72128,197.14432 179.71168,197.29472 179.71168,197.44512 C179.71168,202.29312 158.24928,211.19872 123.18048,211.19872 C87.74048,211.19872 66.05088,202.29312 66.05088,197.44512 C66.05088,197.29792 66.04128,197.15392 66.02208,197.00992 L49.72128,77.94752 C63.83008,87.65952 94.17568,92.79872 123.19968,92.79872 C152.17888,92.79872 182.47328,87.67872 196.61088,77.99552 L188.45728,135.52832 Z M47.99968,65.52832 C48.23008,61.31712 72.42848,44.79872 123.19968,44.79872 C173.96448,44.79872 198.16608,61.31392 198.39968,65.52832 L198.39968,66.96512 C195.61568,76.40832 164.25568,86.39872 123.19968,86.39872 C82.07328,86.39872 50.69728,76.37632 47.99968,66.92032 L47.99968,65.52832 Z M204.79968,65.59872 C204.79968,54.51072 173.01088,38.39872 123.19968,38.39872 C73.38848,38.39872 41.59968,54.51072 41.59968,65.59872 L41.90048,68.01152 L59.65408,197.68832 C60.07968,212.19072 98.75488,217.59872 123.18048,217.59872 C153.49088,217.59872 185.69248,210.62912 186.10848,197.69792 L193.77568,143.62752 C198.04128,144.64832 201.55168,145.16992 204.37088,145.16992 C208.15648,145.16992 210.71648,144.24512 212.26848,142.39552 C213.54208,140.87872 214.02848,139.04192 213.66368,137.08672 C212.83488,132.65792 207.57728,127.88352 196.87008,121.77472 L204.47328,68.13632 L204.79968,65.59872 Z" fill="#FFFFFF"/></g></svg>

After

Width:  |  Height:  |  Size: 2.3 KiB

View File

@@ -0,0 +1,49 @@
{
"id": "s3sync",
"title": "S3 Sync",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"supported_signals": {
"metrics": false,
"logs": true
},
"data_collected": {
"logs": [
{
"name": "Account ID",
"path": "resources.aws.account.id",
"type": "string"
},
{
"name": "Account Region",
"path": "resources.aws.account.region",
"type": "string"
},
{
"name": "Bucket Name",
"path": "resources.aws.bucket.name",
"type": "string"
},
{
"name": "Object Key",
"path": "attributes.aws.object.key",
"type": "string"
},
{
"name": "Object S3 Event Time",
"path": "attributes.aws.object.event_time",
"type": "string"
},
{
"name": "Log line number in source file",
"path": "attributes.log.source.line",
"type": "number"
}
]
},
"telemetry_collection_strategy": {
},
"assets": {
"dashboards": []
}
}

View File

@@ -0,0 +1,3 @@
### Sync logs stored in AWS S3 with SigNoz
Collect logs stored by AWS Services in S3 and explore them in Signoz.

View File

@@ -0,0 +1,94 @@
package services
import (
"github.com/SigNoz/signoz/pkg/types"
)
type Metadata struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
// Present only if the service has been configured in the
// context of a cloud provider account.
// Config *CloudServiceConfig `json:"config,omitempty"`
}
type Definition struct {
Metadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"data_collected"`
Strategy *CollectionStrategy `json:"telemetry_collection_strategy"`
}
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollected struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CollectionStrategy struct {
Provider string `json:"provider"`
AWSMetrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsStrategy `json:"aws_logs,omitempty"`
}
type AWSMetricsStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
StreamFilters []struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
} `json:"cloudwatch_metric_stream_filters"`
}
type AWSLogsStrategy struct {
Subscriptions []struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
} `json:"cloudwatch_logs_subscriptions"`
}
type Dashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *types.DashboardData `json:"definition,omitempty"`
}

View File

@@ -1,4 +1,4 @@
package cloudintegrations package services
import ( import (
"bytes" "bytes"
@@ -15,11 +15,11 @@ import (
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
func listCloudProviderServices( func List(
cloudProvider string, cloudProvider string,
) ([]CloudServiceDetails, *model.ApiError) { ) ([]Definition, *model.ApiError) {
cloudServices := availableServices[cloudProvider] cloudServices, found := supportedServices[cloudProvider]
if cloudServices == nil { if !found || cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf( return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider, "unsupported cloud provider: %s", cloudProvider,
)) ))
@@ -33,10 +33,21 @@ func listCloudProviderServices(
return services, nil return services, nil
} }
func getCloudProviderService( func Map(cloudprovider string) (map[string]Definition, *model.ApiError) {
cloudServices, found := supportedServices[cloudprovider]
if !found || cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudprovider,
))
}
return cloudServices, nil
}
func GetServiceDefinition(
cloudProvider string, serviceId string, cloudProvider string, serviceId string,
) (*CloudServiceDetails, *model.ApiError) { ) (*Definition, *model.ApiError) {
cloudServices := availableServices[cloudProvider] cloudServices := supportedServices[cloudProvider]
if cloudServices == nil { if cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf( return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider, "unsupported cloud provider: %s", cloudProvider,
@@ -57,7 +68,7 @@ func getCloudProviderService(
// Service details read from ./serviceDefinitions // Service details read from ./serviceDefinitions
// { "providerName": { "service_id": {...}} } // { "providerName": { "service_id": {...}} }
var availableServices map[string]map[string]CloudServiceDetails var supportedServices map[string]map[string]Definition
func init() { func init() {
err := readAllServiceDefinitions() err := readAllServiceDefinitions()
@@ -68,15 +79,15 @@ func init() {
} }
} }
//go:embed serviceDefinitions/* //go:embed definitions/*
var serviceDefinitionFiles embed.FS var definitionFiles embed.FS
func readAllServiceDefinitions() error { func readAllServiceDefinitions() error {
availableServices = map[string]map[string]CloudServiceDetails{} supportedServices = map[string]map[string]Definition{}
rootDirName := "serviceDefinitions" rootDirName := "definitions"
cloudProviderDirs, err := fs.ReadDir(serviceDefinitionFiles, rootDirName) cloudProviderDirs, err := fs.ReadDir(definitionFiles, rootDirName)
if err != nil { if err != nil {
return fmt.Errorf("couldn't read dirs in %s: %w", rootDirName, err) return fmt.Errorf("couldn't read dirs in %s: %w", rootDirName, err)
} }
@@ -98,21 +109,21 @@ func readAllServiceDefinitions() error {
return fmt.Errorf("no %s services could be read", cloudProvider) return fmt.Errorf("no %s services could be read", cloudProvider)
} }
availableServices[cloudProvider] = cloudServices supportedServices[cloudProvider] = cloudServices
} }
return nil return nil
} }
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) ( func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]CloudServiceDetails, error, map[string]Definition, error,
) { ) {
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath) svcDefDirs, err := fs.ReadDir(definitionFiles, cloudProviderDirPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't list integrations dirs: %w", err) return nil, fmt.Errorf("couldn't list integrations dirs: %w", err)
} }
svcDefs := map[string]CloudServiceDetails{} svcDefs := map[string]Definition{}
for _, d := range svcDefDirs { for _, d := range svcDefDirs {
if !d.IsDir() { if !d.IsDir() {
@@ -137,10 +148,10 @@ func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath st
return svcDefs, nil return svcDefs, nil
} }
func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) { func readServiceDefinition(cloudProvider string, svcDirpath string) (*Definition, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json") integrationJsonPath := path.Join(svcDirpath, "integration.json")
serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath) serializedSpec, err := definitionFiles.ReadFile(integrationJsonPath)
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w", "couldn't find integration.json in %s: %w",
@@ -157,7 +168,7 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServi
} }
hydrated, err := integrations.HydrateFileUris( hydrated, err := integrations.HydrateFileUris(
integrationSpec, serviceDefinitionFiles, svcDirpath, integrationSpec, definitionFiles, svcDirpath,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
@@ -167,7 +178,7 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServi
} }
hydratedSpec := hydrated.(map[string]any) hydratedSpec := hydrated.(map[string]any)
serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec) serviceDef, err := ParseStructWithJsonTagsFromMap[Definition](hydratedSpec)
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w", "couldn't parse hydrated JSON spec read from %s: %w",
@@ -180,13 +191,13 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServi
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err) return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
} }
serviceDef.TelemetryCollectionStrategy.Provider = cloudProvider serviceDef.Strategy.Provider = cloudProvider
return serviceDef, nil return serviceDef, nil
} }
func validateServiceDefinition(s *CloudServiceDetails) error { func validateServiceDefinition(s *Definition) error {
// Validate dashboard data // Validate dashboard data
seenDashboardIds := map[string]interface{}{} seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards { for _, dd := range s.Assets.Dashboards {
@@ -196,7 +207,7 @@ func validateServiceDefinition(s *CloudServiceDetails) error {
seenDashboardIds[dd.Id] = nil seenDashboardIds[dd.Id] = nil
} }
if s.TelemetryCollectionStrategy == nil { if s.Strategy == nil {
return fmt.Errorf("telemetry_collection_strategy is required") return fmt.Errorf("telemetry_collection_strategy is required")
} }

View File

@@ -1,4 +1,4 @@
package cloudintegrations package services
import ( import (
"testing" "testing"
@@ -11,22 +11,22 @@ func TestAvailableServices(t *testing.T) {
require := require.New(t) require := require.New(t)
// should be able to list available services. // should be able to list available services.
_, apiErr := listCloudProviderServices("bad-cloud-provider") _, apiErr := List("bad-cloud-provider")
require.NotNil(apiErr) require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type()) require.Equal(model.ErrorNotFound, apiErr.Type())
awsSvcs, apiErr := listCloudProviderServices("aws") awsSvcs, apiErr := List("aws")
require.Nil(apiErr) require.Nil(apiErr)
require.Greater(len(awsSvcs), 0) require.Greater(len(awsSvcs), 0)
// should be able to get details of a service // should be able to get details of a service
_, apiErr = getCloudProviderService( _, apiErr = GetServiceDefinition(
"aws", "bad-service-id", "aws", "bad-service-id",
) )
require.NotNil(apiErr) require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type()) require.Equal(model.ErrorNotFound, apiErr.Type())
svc, apiErr := getCloudProviderService( svc, apiErr := GetServiceDefinition(
"aws", awsSvcs[0].Id, "aws", awsSvcs[0].Id,
) )
require.Nil(apiErr) require.Nil(apiErr)

View File

@@ -22,6 +22,7 @@ import (
errorsV2 "github.com/SigNoz/signoz/pkg/errors" errorsV2 "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render" "github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/preference" "github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer" "github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
@@ -3998,8 +3999,8 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
ctx context.Context, ctx context.Context,
cloudProvider string, cloudProvider string,
cloudAccountId string, cloudAccountId string,
svcDetails *cloudintegrations.CloudServiceDetails, svcDetails *cloudintegrations.ServiceDetails,
) (*cloudintegrations.CloudServiceConnectionStatus, *model.ApiError) { ) (*cloudintegrations.ServiceConnectionStatus, *model.ApiError) {
if cloudProvider != "aws" { if cloudProvider != "aws" {
// TODO(Raj): Make connection check generic for all providers in a follow up change // TODO(Raj): Make connection check generic for all providers in a follow up change
return nil, model.BadRequest( return nil, model.BadRequest(
@@ -4007,14 +4008,14 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
) )
} }
telemetryCollectionStrategy := svcDetails.TelemetryCollectionStrategy telemetryCollectionStrategy := svcDetails.Strategy
if telemetryCollectionStrategy == nil { if telemetryCollectionStrategy == nil {
return nil, model.InternalError(fmt.Errorf( return nil, model.InternalError(fmt.Errorf(
"service doesn't have telemetry collection strategy: %s", svcDetails.Id, "service doesn't have telemetry collection strategy: %s", svcDetails.Id,
)) ))
} }
result := &cloudintegrations.CloudServiceConnectionStatus{} result := &cloudintegrations.ServiceConnectionStatus{}
errors := []*model.ApiError{} errors := []*model.ApiError{}
var resultLock sync.Mutex var resultLock sync.Mutex
@@ -4074,10 +4075,10 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus( func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx context.Context, ctx context.Context,
cloudAccountId string, cloudAccountId string,
strategy *cloudintegrations.AWSMetricsCollectionStrategy, strategy *services.AWSMetricsStrategy,
metricsCollectedBySvc []cloudintegrations.CollectedMetric, metricsCollectedBySvc []services.CollectedMetric,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) { ) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.CloudwatchMetricsStreamFilters) < 1 { if strategy == nil || len(strategy.StreamFilters) < 1 {
return nil, nil return nil, nil
} }
@@ -4086,7 +4087,7 @@ func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
"cloud_account_id": cloudAccountId, "cloud_account_id": cloudAccountId,
} }
metricsNamespace := strategy.CloudwatchMetricsStreamFilters[0].Namespace metricsNamespace := strategy.StreamFilters[0].Namespace
metricsNamespaceParts := strings.Split(metricsNamespace, "/") metricsNamespaceParts := strings.Split(metricsNamespace, "/")
if len(metricsNamespaceParts) >= 2 { if len(metricsNamespaceParts) >= 2 {
@@ -4123,13 +4124,13 @@ func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
func (aH *APIHandler) calculateAWSIntegrationSvcLogsConnectionStatus( func (aH *APIHandler) calculateAWSIntegrationSvcLogsConnectionStatus(
ctx context.Context, ctx context.Context,
cloudAccountId string, cloudAccountId string,
strategy *cloudintegrations.AWSLogsCollectionStrategy, strategy *services.AWSLogsStrategy,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) { ) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.CloudwatchLogsSubscriptions) < 1 { if strategy == nil || len(strategy.Subscriptions) < 1 {
return nil, nil return nil, nil
} }
logGroupNamePrefix := strategy.CloudwatchLogsSubscriptions[0].LogGroupNamePrefix logGroupNamePrefix := strategy.Subscriptions[0].LogGroupNamePrefix
if len(logGroupNamePrefix) < 1 { if len(logGroupNamePrefix) < 1 {
return nil, nil return nil, nil
} }