Compare commits

...

3 Commits

Author SHA1 Message Date
Swapnil Nakade
9d53ee5053 feat: adding more generic struct 2026-02-13 18:49:31 +05:30
Piyush Singariya
c62b4d9141 test: testing generics 2026-02-13 16:06:57 +05:30
Swapnil Nakade
5720fcb654 feat: adding azure cloud integration apis 2026-02-12 23:49:43 +05:30
19 changed files with 5491 additions and 439 deletions

View File

@@ -7,18 +7,19 @@ import (
"net/url"
"slices"
"sort"
"strings"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
integrationstore "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"golang.org/x/exp/maps"
)
@@ -29,33 +30,30 @@ var (
)
type awsProvider struct {
logger *slog.Logger
querier interfaces.Querier
accountsRepo integrationstore.CloudProviderAccountsRepository
serviceConfigRepo integrationstore.ServiceConfigDatabase
awsServiceDefinitions *services.AWSServicesProvider
reader interfaces.Reader
logger *slog.Logger
querier querier.Querier
accountsRepo integrationstore.CloudProviderAccountsRepository
serviceConfigRepo integrationstore.ServiceConfigDatabase
serviceDefinitions *services.ServicesProvider[*integrationstypes.AWSDefinition]
}
func NewAWSCloudProvider(
logger *slog.Logger,
accountsRepo integrationstore.CloudProviderAccountsRepository,
serviceConfigRepo integrationstore.ServiceConfigDatabase,
reader interfaces.Reader,
querier interfaces.Querier,
querier querier.Querier,
) integrationstypes.CloudProvider {
awsServiceDefinitions, err := services.NewAWSCloudProviderServices()
serviceDefinitions, err := services.NewAWSCloudProviderServices()
if err != nil {
panic("failed to initialize AWS service definitions: " + err.Error())
}
return &awsProvider{
logger: logger,
reader: reader,
querier: querier,
accountsRepo: accountsRepo,
serviceConfigRepo: serviceConfigRepo,
awsServiceDefinitions: awsServiceDefinitions,
logger: logger,
querier: querier,
accountsRepo: accountsRepo,
serviceConfigRepo: serviceConfigRepo,
serviceDefinitions: serviceDefinitions,
}
}
@@ -96,16 +94,19 @@ func (a *awsProvider) AgentCheckIn(ctx context.Context, req *integrationstypes.P
}
if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "can't check in with new %s account id %s for account %s with existing %s id %s",
return nil, model.BadRequest(fmt.Errorf(
"can't check in with new %s account id %s for account %s with existing %s id %s",
a.GetName().String(), req.AccountID, existingAccount.ID.StringValue(), a.GetName().String(),
*existingAccount.AccountID)
*existingAccount.AccountID,
))
}
existingAccount, err = a.accountsRepo.GetConnectedCloudAccount(ctx, req.OrgID, a.GetName().String(), req.AccountID)
if existingAccount != nil && existingAccount.ID.StringValue() != req.ID {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
return nil, model.BadRequest(fmt.Errorf(
"can't check in to %s account %s with id %s. already connected with id %s",
a.GetName().String(), req.AccountID, req.ID, existingAccount.ID.StringValue())
a.GetName().String(), req.AccountID, req.ID, existingAccount.ID.StringValue(),
))
}
agentReport := integrationstypes.AgentReport{
@@ -138,20 +139,18 @@ func (a *awsProvider) getAWSAgentConfig(ctx context.Context, account *integratio
agentConfig := &integrationstypes.AWSAgentIntegrationConfig{
EnabledRegions: []string{},
TelemetryCollectionStrategy: &integrationstypes.AWSCollectionStrategy{
Provider: a.GetName(),
AWSMetrics: &integrationstypes.AWSMetricsStrategy{},
AWSLogs: &integrationstypes.AWSLogsStrategy{},
S3Buckets: map[string][]string{},
Metrics: &integrationstypes.AWSMetricsStrategy{},
Logs: &integrationstypes.AWSLogsStrategy{},
},
}
accountConfig := new(integrationstypes.AWSAccountConfig)
err := accountConfig.Unmarshal([]byte(account.Config))
err := integrationstypes.UnmarshalJSON([]byte(account.Config), accountConfig)
if err != nil {
return nil, err
}
if accountConfig != nil && accountConfig.EnabledRegions != nil {
if accountConfig.EnabledRegions != nil {
agentConfig.EnabledRegions = accountConfig.EnabledRegions
}
@@ -167,14 +166,14 @@ func (a *awsProvider) getAWSAgentConfig(ctx context.Context, account *integratio
slices.Sort(configuredServices)
for _, svcType := range configuredServices {
definition, err := a.awsServiceDefinitions.GetServiceDefinition(ctx, svcType)
definition, err := a.serviceDefinitions.GetServiceDefinition(ctx, svcType)
if err != nil {
continue
}
config := svcConfigs[svcType]
serviceConfig := new(integrationstypes.AWSCloudServiceConfig)
err = serviceConfig.Unmarshal(config)
err = integrationstypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
continue
}
@@ -184,18 +183,18 @@ func (a *awsProvider) getAWSAgentConfig(ctx context.Context, account *integratio
// S3 bucket sync; No cloudwatch logs are appended for this service type;
// Though definition is populated with a custom cloudwatch group that helps in calculating logs connection status
agentConfig.TelemetryCollectionStrategy.S3Buckets = serviceConfig.Logs.S3Buckets
} else if definition.Strategy.AWSLogs != nil { // services that includes a logs subscription
agentConfig.TelemetryCollectionStrategy.AWSLogs.Subscriptions = append(
agentConfig.TelemetryCollectionStrategy.AWSLogs.Subscriptions,
definition.Strategy.AWSLogs.Subscriptions...,
} else if definition.Strategy != nil && definition.Strategy.Logs != nil { // services that includes a logs subscription
agentConfig.TelemetryCollectionStrategy.Logs.Subscriptions = append(
agentConfig.TelemetryCollectionStrategy.Logs.Subscriptions,
definition.Strategy.Logs.Subscriptions...,
)
}
}
if serviceConfig.Metrics != nil && serviceConfig.Metrics.Enabled && definition.Strategy.AWSMetrics != nil {
agentConfig.TelemetryCollectionStrategy.AWSMetrics.StreamFilters = append(
agentConfig.TelemetryCollectionStrategy.AWSMetrics.StreamFilters,
definition.Strategy.AWSMetrics.StreamFilters...,
if serviceConfig.Metrics != nil && serviceConfig.Metrics.Enabled && definition.Strategy != nil && definition.Strategy.Metrics != nil {
agentConfig.TelemetryCollectionStrategy.Metrics.StreamFilters = append(
agentConfig.TelemetryCollectionStrategy.Metrics.StreamFilters,
definition.Strategy.Metrics.StreamFilters...,
)
}
}
@@ -222,7 +221,7 @@ func (a *awsProvider) ListServices(ctx context.Context, orgID string, cloudAccou
for svcType, config := range serviceConfigs {
serviceConfig := new(integrationstypes.AWSCloudServiceConfig)
err = serviceConfig.Unmarshal(config)
err = integrationstypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
return nil, err
}
@@ -232,9 +231,9 @@ func (a *awsProvider) ListServices(ctx context.Context, orgID string, cloudAccou
summaries := make([]integrationstypes.AWSServiceSummary, 0)
definitions, err := a.awsServiceDefinitions.ListServiceDefinitions(ctx)
definitions, err := a.serviceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, err
return nil, model.InternalError(fmt.Errorf("couldn't list aws service definitions: %w", err))
}
for _, def := range definitions {
@@ -260,18 +259,17 @@ func (a *awsProvider) ListServices(ctx context.Context, orgID string, cloudAccou
func (a *awsProvider) GetServiceDetails(ctx context.Context, req *integrationstypes.GetServiceDetailsReq) (any, error) {
details := new(integrationstypes.GettableAWSServiceDetails)
awsDefinition, err := a.awsServiceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
awsDefinition, err := a.serviceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
if err != nil {
return nil, err
return nil, model.InternalError(fmt.Errorf("couldn't get aws service definition: %w", err))
}
details.AWSServiceDefinition = *awsDefinition
details.Strategy.Provider = a.GetName()
details.AWSDefinition = *awsDefinition
if req.CloudAccountID == nil {
return details, nil
}
config, err := a.getServiceConfig(ctx, &details.AWSServiceDefinition, req.OrgID, a.GetName().String(), req.ServiceId, *req.CloudAccountID)
config, err := a.getServiceConfig(ctx, &details.AWSDefinition, req.OrgID, a.GetName().String(), req.ServiceId, *req.CloudAccountID)
if err != nil {
return nil, err
}
@@ -282,11 +280,12 @@ func (a *awsProvider) GetServiceDetails(ctx context.Context, req *integrationsty
details.Config = config
connectionStatus, err := a.calculateCloudIntegrationServiceConnectionStatus(
connectionStatus, err := a.getServiceConnectionStatus(
ctx,
req.OrgID,
*req.CloudAccountID,
&details.AWSServiceDefinition,
req.OrgID,
&details.AWSDefinition,
config,
)
if err != nil {
return nil, err
@@ -297,208 +296,249 @@ func (a *awsProvider) GetServiceDetails(ctx context.Context, req *integrationsty
return details, nil
}
func (a *awsProvider) calculateCloudIntegrationServiceConnectionStatus(
func (a *awsProvider) getServiceConnectionStatus(
ctx context.Context,
cloudAccountID string,
orgID valuer.UUID,
cloudAccountId string,
svcDetails *integrationstypes.AWSServiceDefinition,
def *integrationstypes.AWSDefinition,
serviceConfig *integrationstypes.AWSCloudServiceConfig,
) (*integrationstypes.ServiceConnectionStatus, error) {
telemetryCollectionStrategy := svcDetails.Strategy
result := &integrationstypes.ServiceConnectionStatus{}
if telemetryCollectionStrategy == nil {
return result, model.InternalError(fmt.Errorf(
"service doesn't have telemetry collection strategy: %s", svcDetails.Id,
))
if def.Strategy == nil {
return nil, nil
}
errors := make([]error, 0)
var resultLock sync.Mutex
resp := new(integrationstypes.ServiceConnectionStatus)
var wg sync.WaitGroup
wg := sync.WaitGroup{}
wg.Add(2)
// Calculate metrics connection status
if telemetryCollectionStrategy.AWSMetrics != nil {
wg.Add(1)
if def.Strategy.Metrics != nil && serviceConfig.Metrics != nil && serviceConfig.Metrics.Enabled {
go func() {
defer func() {
if r := recover(); r != nil {
a.logger.ErrorContext(
ctx, "panic while getting service metrics connection status",
"error", r,
"service", def.DefinitionMetadata.Id,
)
}
}()
defer wg.Done()
metricsConnStatus, apiErr := a.calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx, cloudAccountId, telemetryCollectionStrategy.AWSMetrics, svcDetails.DataCollected.Metrics,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Metrics = metricsConnStatus
}
status, _ := a.getServiceMetricsConnectionStatus(ctx, cloudAccountID, orgID, def)
resp.Metrics = status
}()
}
// Calculate logs connection status
if telemetryCollectionStrategy.AWSLogs != nil {
wg.Add(1)
if def.Strategy.Logs != nil && serviceConfig.Logs != nil && serviceConfig.Logs.Enabled {
go func() {
defer func() {
if r := recover(); r != nil {
a.logger.ErrorContext(
ctx, "panic while getting service logs connection status",
"error", r,
"service", def.DefinitionMetadata.Id,
)
}
}()
defer wg.Done()
logsConnStatus, apiErr := a.calculateAWSIntegrationSvcLogsConnectionStatus(
ctx, orgID, cloudAccountId, telemetryCollectionStrategy.AWSLogs,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Logs = logsConnStatus
}
status, _ := a.getServiceLogsConnectionStatus(ctx, cloudAccountID, orgID, def)
resp.Logs = status
}()
}
wg.Wait()
if len(errors) > 0 {
return result, errors[0]
}
return result, nil
}
func (a *awsProvider) calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *integrationstypes.AWSMetricsStrategy,
metricsCollectedBySvc []integrationstypes.CollectedMetric,
) (*integrationstypes.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.StreamFilters) < 1 {
return nil, nil
}
expectedLabelValues := map[string]string{
"cloud_provider": "aws",
"cloud_account_id": cloudAccountId,
}
metricsNamespace := strategy.StreamFilters[0].Namespace
metricsNamespaceParts := strings.Split(metricsNamespace, "/")
if len(metricsNamespaceParts) >= 2 {
expectedLabelValues["service_namespace"] = metricsNamespaceParts[0]
expectedLabelValues["service_name"] = metricsNamespaceParts[1]
} else {
// metrics for single word namespaces like "CWAgent" do not
// have the service_namespace label populated
expectedLabelValues["service_name"] = metricsNamespaceParts[0]
}
metricNamesCollectedBySvc := []string{}
for _, cm := range metricsCollectedBySvc {
metricNamesCollectedBySvc = append(metricNamesCollectedBySvc, cm.Name)
}
statusForLastReceivedMetric, apiErr := a.reader.GetLatestReceivedMetric(
ctx, metricNamesCollectedBySvc, expectedLabelValues,
)
if apiErr != nil {
return nil, apiErr
}
if statusForLastReceivedMetric != nil {
return &integrationstypes.SignalConnectionStatus{
LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis,
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
return resp, nil
}
func (a *awsProvider) calculateAWSIntegrationSvcLogsConnectionStatus(
func (a *awsProvider) getServiceMetricsConnectionStatus(
ctx context.Context,
cloudAccountID string,
orgID valuer.UUID,
cloudAccountId string,
strategy *integrationstypes.AWSLogsStrategy,
) (*integrationstypes.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.Subscriptions) < 1 {
def *integrationstypes.AWSDefinition,
) ([]*integrationstypes.SignalConnectionStatus, error) {
if def.Strategy == nil ||
def.Strategy.Metrics == nil ||
len(def.Strategy.Metrics.StreamFilters) < 1 ||
len(def.DataCollected.Metrics) < 1 {
return nil, nil
}
logGroupNamePrefix := strategy.Subscriptions[0].LogGroupNamePrefix
if len(logGroupNamePrefix) < 1 {
return nil, nil
}
statusResp := make([]*integrationstypes.SignalConnectionStatus, 0)
logsConnTestFilter := &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "cloud.account.id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
for _, category := range def.IngestionStatusCheck.Metrics {
queries := make([]qbtypes.QueryEnvelope, 0)
for _, check := range category.Checks {
filterExpression := fmt.Sprintf(`cloud.provider="aws" AND cloud.account.id="%s"`, cloudAccountID)
f := ""
for _, attribute := range check.Attributes {
f = fmt.Sprintf("%s %s", attribute.Name, attribute.Operator)
if attribute.Value != "" {
f = fmt.Sprintf("%s '%s'", f, attribute.Value)
}
filterExpression = fmt.Sprintf("%s AND %s", filterExpression, f)
}
queries = append(queries, qbtypes.QueryEnvelope{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: valuer.GenerateUUID().String(),
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{{
MetricName: check.Key,
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
}},
Filter: &qbtypes.Filter{
Expression: filterExpression,
},
},
Operator: "=",
Value: cloudAccountId,
},
{
Key: v3.AttributeKey{
Key: "aws.cloudwatch.log_group_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "like",
Value: logGroupNamePrefix + "%",
},
},
}
})
}
// TODO(Raj): Receive this as a param from UI in the future.
lookbackSeconds := int64(30 * 60)
qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: logsConnTestFilter,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
resp, err := a.querier.QueryRange(ctx, orgID, &qbtypes.QueryRangeRequest{
SchemaVersion: "v5",
Start: uint64(time.Now().Add(-time.Hour).UnixMilli()),
End: uint64(time.Now().UnixMilli()),
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
},
}
queryRes, _, err := a.querier.QueryRange(
ctx, orgID, qrParams,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]
})
if err != nil {
a.logger.DebugContext(ctx,
"error querying for service metrics connection status",
"error", err,
"service", def.DefinitionMetadata.Id,
)
continue
}
return &integrationstypes.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
if resp != nil && len(resp.Data.Results) < 1 {
continue
}
queryResponse, ok := resp.Data.Results[0].(*qbtypes.TimeSeriesData)
if !ok {
continue
}
if queryResponse == nil ||
len(queryResponse.Aggregations) < 1 ||
len(queryResponse.Aggregations[0].Series) < 1 ||
len(queryResponse.Aggregations[0].Series[0].Values) < 1 {
continue
}
statusResp = append(statusResp, &integrationstypes.SignalConnectionStatus{
CategoryID: category.Category,
CategoryDisplayName: category.DisplayName,
LastReceivedTsMillis: queryResponse.Aggregations[0].Series[0].Values[0].Timestamp,
LastReceivedFrom: "signoz-aws-integration",
}, nil
})
}
return nil, nil
return statusResp, nil
}
func (a *awsProvider) getServiceConfig(ctx context.Context,
def *integrationstypes.AWSServiceDefinition, orgID valuer.UUID, cloudProvider, serviceId, cloudAccountId string,
func (a *awsProvider) getServiceLogsConnectionStatus(
ctx context.Context,
cloudAccountID string,
orgID valuer.UUID,
def *integrationstypes.AWSDefinition,
) ([]*integrationstypes.SignalConnectionStatus, error) {
if def.Strategy == nil ||
def.Strategy.Logs == nil ||
len(def.Strategy.Logs.Subscriptions) < 1 ||
len(def.DataCollected.Logs) < 1 {
return nil, nil
}
statusResp := make([]*integrationstypes.SignalConnectionStatus, 0)
for _, category := range def.IngestionStatusCheck.Logs {
queries := make([]qbtypes.QueryEnvelope, 0)
for _, check := range category.Checks {
filterExpression := fmt.Sprintf(`cloud.account.id="%s"`, cloudAccountID)
f := ""
for _, attribute := range check.Attributes {
f = fmt.Sprintf("%s %s", attribute.Name, attribute.Operator)
if attribute.Value != "" {
f = fmt.Sprintf("%s '%s'", f, attribute.Value)
}
filterExpression = fmt.Sprintf("%s AND %s", filterExpression, f)
}
queries = append(queries, qbtypes.QueryEnvelope{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Name: valuer.GenerateUUID().String(),
Signal: telemetrytypes.SignalLogs,
Aggregations: []qbtypes.LogAggregation{{
Expression: "count()",
}},
Filter: &qbtypes.Filter{
Expression: filterExpression,
},
Limit: 10,
Offset: 0,
},
})
}
resp, err := a.querier.QueryRange(ctx, orgID, &qbtypes.QueryRangeRequest{
SchemaVersion: "v1",
Start: uint64(time.Now().Add(-time.Hour * 1).UnixMilli()),
End: uint64(time.Now().UnixMilli()),
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
})
if err != nil {
a.logger.DebugContext(ctx,
"error querying for service logs connection status",
"error", err,
"service", def.DefinitionMetadata.Id,
)
continue
}
if resp != nil && len(resp.Data.Results) < 1 {
continue
}
queryResponse, ok := resp.Data.Results[0].(*qbtypes.TimeSeriesData)
if !ok {
continue
}
if queryResponse == nil ||
len(queryResponse.Aggregations) < 1 ||
len(queryResponse.Aggregations[0].Series) < 1 ||
len(queryResponse.Aggregations[0].Series[0].Values) < 1 {
continue
}
statusResp = append(statusResp, &integrationstypes.SignalConnectionStatus{
CategoryID: category.Category,
CategoryDisplayName: category.DisplayName,
LastReceivedTsMillis: queryResponse.Aggregations[0].Series[0].Values[0].Timestamp,
LastReceivedFrom: "signoz-aws-integration",
})
}
return statusResp, nil
}
func (a *awsProvider) getServiceConfig(
ctx context.Context,
def *integrationstypes.AWSDefinition,
orgID valuer.UUID, cloudProvider, serviceId, cloudAccountId string,
) (*integrationstypes.AWSCloudServiceConfig, error) {
activeAccount, err := a.accountsRepo.GetConnectedCloudAccount(ctx, orgID.String(), cloudProvider, cloudAccountId)
if err != nil {
@@ -515,13 +555,13 @@ func (a *awsProvider) getServiceConfig(ctx context.Context,
}
serviceConfig := new(integrationstypes.AWSCloudServiceConfig)
err = serviceConfig.Unmarshal(config)
err = integrationstypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
return nil, err
}
if config != nil && serviceConfig.Metrics != nil && serviceConfig.Metrics.Enabled {
def.PopulateDashboardURLs(serviceId)
def.PopulateDashboardURLs(a.GetName(), serviceId)
}
return serviceConfig, nil
@@ -545,7 +585,7 @@ func (a *awsProvider) GetAvailableDashboards(ctx context.Context, orgID valuer.U
for svcId, config := range configsBySvcId {
serviceConfig := new(integrationstypes.AWSCloudServiceConfig)
err = serviceConfig.Unmarshal(config)
err = integrationstypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
return nil, err
}
@@ -559,7 +599,7 @@ func (a *awsProvider) GetAvailableDashboards(ctx context.Context, orgID valuer.U
svcDashboards := make([]*dashboardtypes.Dashboard, 0)
allServices, err := a.awsServiceDefinitions.ListServiceDefinitions(ctx)
allServices, err := a.serviceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to list aws service definitions")
}
@@ -593,7 +633,7 @@ func (a *awsProvider) GetDashboard(ctx context.Context, req *integrationstypes.G
func (a *awsProvider) GenerateConnectionArtifact(ctx context.Context, req *integrationstypes.PostableConnectionArtifact) (any, error) {
connection := new(integrationstypes.PostableAWSConnectionUrl)
err := connection.Unmarshal(req.Data)
err := integrationstypes.UnmarshalJSON(req.Data, connection)
if err != nil {
return nil, err
}
@@ -608,7 +648,7 @@ func (a *awsProvider) GenerateConnectionArtifact(ctx context.Context, req *integ
}
}
config, err := connection.AccountConfig.Marshal()
config, err := integrationstypes.MarshalJSON(connection.AccountConfig)
if err != nil {
return nil, err
}
@@ -653,13 +693,13 @@ func (a *awsProvider) GenerateConnectionArtifact(ctx context.Context, req *integ
}
func (a *awsProvider) UpdateServiceConfig(ctx context.Context, req *integrationstypes.PatchableServiceConfig) (any, error) {
definition, err := a.awsServiceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
definition, err := a.serviceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
if err != nil {
return nil, err
return nil, model.InternalError(fmt.Errorf("couldn't get aws service definition: %w", err))
}
serviceConfig := new(integrationstypes.PatchableAWSCloudServiceConfig)
err = serviceConfig.Unmarshal(req.Config)
err = integrationstypes.UnmarshalJSON(req.Config, serviceConfig)
if err != nil {
return nil, err
}
@@ -676,7 +716,7 @@ func (a *awsProvider) UpdateServiceConfig(ctx context.Context, req *integrations
return nil, err
}
serviceConfigBytes, err := serviceConfig.Config.Marshal()
serviceConfigBytes, err := integrationstypes.MarshalJSON(serviceConfig.Config)
if err != nil {
return nil, err
}
@@ -688,20 +728,20 @@ func (a *awsProvider) UpdateServiceConfig(ctx context.Context, req *integrations
return nil, err
}
if err = serviceConfig.Config.Unmarshal(updatedConfig); err != nil {
if err = integrationstypes.UnmarshalJSON(updatedConfig, serviceConfig); err != nil {
return nil, err
}
return &integrationstypes.PatchServiceConfigResponse{
ServiceId: req.ServiceId,
Config: serviceConfig.Config,
Config: serviceConfig,
}, nil
}
func (a *awsProvider) UpdateAccountConfig(ctx context.Context, req *integrationstypes.PatchableAccountConfig) (any, error) {
config := new(integrationstypes.PatchableAWSAccountConfig)
err := config.Unmarshal(req.Data)
err := integrationstypes.UnmarshalJSON(req.Data, config)
if err != nil {
return nil, err
}
@@ -718,7 +758,7 @@ func (a *awsProvider) UpdateAccountConfig(ctx context.Context, req *integrations
return nil, errors.NewInvalidInputf(CodeInvalidAWSRegion, "invalid aws region: %s", region)
}
configBytes, err := config.Config.Marshal()
configBytes, err := integrationstypes.MarshalJSON(config.Config)
if err != nil {
return nil, err
}

View File

@@ -0,0 +1,592 @@
package implazureprovider
import (
"context"
"fmt"
"log/slog"
"slices"
"sort"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
"golang.org/x/exp/maps"
)
var (
CodeInvalidAzureRegion = errors.MustNewCode("invalid_azure_region")
CodeDashboardNotFound = errors.MustNewCode("dashboard_not_found")
)
type azureProvider struct {
logger *slog.Logger
accountsRepo store.CloudProviderAccountsRepository
serviceConfigRepo store.ServiceConfigDatabase
azureServiceDefinitions *services.ServicesProvider[*integrationstypes.AzureDefinition]
querier querier.Querier
}
func NewAzureCloudProvider(
logger *slog.Logger,
accountsRepo store.CloudProviderAccountsRepository,
serviceConfigRepo store.ServiceConfigDatabase,
querier querier.Querier,
) integrationstypes.CloudProvider {
azureServiceDefinitions, err := services.NewAzureCloudProviderServices()
if err != nil {
panic("failed to initialize Azure service definitions: " + err.Error())
}
return &azureProvider{
logger: logger,
accountsRepo: accountsRepo,
serviceConfigRepo: serviceConfigRepo,
azureServiceDefinitions: azureServiceDefinitions,
querier: querier,
}
}
func (a *azureProvider) GetAccountStatus(ctx context.Context, orgID, accountID string) (*integrationstypes.GettableAccountStatus, error) {
account, err := a.accountsRepo.Get(ctx, orgID, a.GetName().String(), accountID)
if err != nil {
return nil, err
}
return &integrationstypes.GettableAccountStatus{
Id: account.ID.String(),
CloudAccountId: account.AccountID,
Status: account.Status(),
}, nil
}
func (a *azureProvider) ListConnectedAccounts(ctx context.Context, orgID string) (*integrationstypes.GettableConnectedAccountsList, error) {
accountRecords, err := a.accountsRepo.ListConnected(ctx, orgID, a.GetName().String())
if err != nil {
return nil, err
}
connectedAccounts := make([]*integrationstypes.Account, 0, len(accountRecords))
for _, r := range accountRecords {
connectedAccounts = append(connectedAccounts, r.Account(a.GetName()))
}
return &integrationstypes.GettableConnectedAccountsList{
Accounts: connectedAccounts,
}, nil
}
func (a *azureProvider) AgentCheckIn(ctx context.Context, req *integrationstypes.PostableAgentCheckInPayload) (any, error) {
existingAccount, err := a.accountsRepo.Get(ctx, req.OrgID, a.GetName().String(), req.ID)
if err != nil {
return nil, err
}
if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID {
return nil, model.BadRequest(fmt.Errorf(
"can't check in with new %s account id %s for account %s with existing %s id %s",
a.GetName().String(), req.AccountID, existingAccount.ID.StringValue(), a.GetName().String(),
*existingAccount.AccountID,
))
}
existingAccount, err = a.accountsRepo.GetConnectedCloudAccount(ctx, req.OrgID, a.GetName().String(), req.AccountID)
if existingAccount != nil && existingAccount.ID.StringValue() != req.ID {
return nil, model.BadRequest(fmt.Errorf(
"can't check in to %s account %s with id %s. already connected with id %s",
a.GetName().String(), req.AccountID, req.ID, existingAccount.ID.StringValue(),
))
}
agentReport := integrationstypes.AgentReport{
TimestampMillis: time.Now().UnixMilli(),
Data: req.Data,
}
account, err := a.accountsRepo.Upsert(
ctx, req.OrgID, a.GetName().String(), &req.ID, nil, &req.AccountID, &agentReport, nil,
)
if err != nil {
return nil, err
}
agentConfig, err := a.getAzureAgentConfig(ctx, account)
if err != nil {
return nil, err
}
return &integrationstypes.GettableAzureAgentCheckIn{
AccountId: account.ID.StringValue(),
CloudAccountId: *account.AccountID,
RemovedAt: account.RemovedAt,
IntegrationConfig: *agentConfig,
}, nil
}
func (a *azureProvider) getAzureAgentConfig(ctx context.Context, account *integrationstypes.CloudIntegration) (*integrationstypes.AzureAgentIntegrationConfig, error) {
// prepare and return integration config to be consumed by agent
agentConfig := &integrationstypes.AzureAgentIntegrationConfig{
TelemetryCollectionStrategy: make(map[string]*integrationstypes.AzureCollectionStrategy),
}
accountConfig := new(integrationstypes.AzureAccountConfig)
err := integrationstypes.UnmarshalJSON([]byte(account.Config), accountConfig)
if err != nil {
return nil, err
}
if account.Config != "" {
agentConfig.DeploymentRegion = accountConfig.DeploymentRegion
agentConfig.EnabledResourceGroups = accountConfig.EnabledResourceGroups
}
svcConfigs, err := a.serviceConfigRepo.GetAllForAccount(
ctx, account.OrgID, account.ID.StringValue(),
)
if err != nil {
return nil, err
}
// accumulate config in a fixed order to ensure same config generated across runs
configuredServices := maps.Keys(svcConfigs)
slices.Sort(configuredServices)
metrics := make([]*integrationstypes.AzureMetricsStrategy, 0)
logs := make([]*integrationstypes.AzureLogsStrategy, 0)
for _, svcType := range configuredServices {
definition, err := a.azureServiceDefinitions.GetServiceDefinition(ctx, svcType)
if err != nil {
continue
}
config := svcConfigs[svcType]
serviceConfig := new(integrationstypes.AzureCloudServiceConfig)
err = integrationstypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
continue
}
metricsStrategyMap := make(map[string]*integrationstypes.AzureMetricsStrategy)
logsStrategyMap := make(map[string]*integrationstypes.AzureLogsStrategy)
if definition.Strategy != nil && definition.Strategy.Metrics != nil {
for _, metric := range definition.Strategy.Metrics {
metricsStrategyMap[metric.Name] = metric
}
}
if definition.Strategy != nil && definition.Strategy.Logs != nil {
for _, log := range definition.Strategy.Logs {
logsStrategyMap[log.Name] = log
}
}
if serviceConfig.Metrics != nil {
for _, metric := range serviceConfig.Metrics {
if metric.Enabled {
metrics = append(metrics, &integrationstypes.AzureMetricsStrategy{
CategoryType: metricsStrategyMap[metric.Name].CategoryType,
Name: metric.Name,
})
}
}
}
if serviceConfig.Logs != nil {
for _, log := range serviceConfig.Logs {
if log.Enabled {
logs = append(logs, &integrationstypes.AzureLogsStrategy{
CategoryType: logsStrategyMap[log.Name].CategoryType,
Name: log.Name,
})
}
}
}
strategy := &integrationstypes.AzureCollectionStrategy{
Metrics: metrics,
Logs: logs,
}
agentConfig.TelemetryCollectionStrategy[svcType] = strategy
}
return agentConfig, nil
}
func (a *azureProvider) GetName() valuer.String {
return integrationstypes.CloudProviderAzure
}
func (a *azureProvider) ListServices(ctx context.Context, orgID string, cloudAccountID *string) (any, error) {
svcConfigs := make(map[string]*integrationstypes.AzureCloudServiceConfig)
if cloudAccountID != nil {
activeAccount, err := a.accountsRepo.GetConnectedCloudAccount(ctx, orgID, a.GetName().String(), *cloudAccountID)
if err != nil {
return nil, err
}
serviceConfigs, err := a.serviceConfigRepo.GetAllForAccount(ctx, orgID, activeAccount.ID.StringValue())
if err != nil {
return nil, err
}
for svcType, config := range serviceConfigs {
serviceConfig := new(integrationstypes.AzureCloudServiceConfig)
err = integrationstypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
return nil, err
}
svcConfigs[svcType] = serviceConfig
}
}
summaries := make([]integrationstypes.AzureServiceSummary, 0)
definitions, err := a.azureServiceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, model.InternalError(fmt.Errorf("couldn't list aws service definitions: %w", err))
}
for _, def := range definitions {
summary := integrationstypes.AzureServiceSummary{
DefinitionMetadata: def.DefinitionMetadata,
Config: nil,
}
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
}
sort.Slice(summaries, func(i, j int) bool {
return summaries[i].DefinitionMetadata.Title < summaries[j].DefinitionMetadata.Title
})
return &integrationstypes.GettableAzureServices{
Services: summaries,
}, nil
}
func (a *azureProvider) GetServiceDetails(ctx context.Context, req *integrationstypes.GetServiceDetailsReq) (any, error) {
details := new(integrationstypes.GettableAzureServiceDetails)
azureDefinition, err := a.azureServiceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
if err != nil {
return nil, model.InternalError(fmt.Errorf("couldn't get aws service definition: %w", err))
}
details.AzureDefinition = *azureDefinition
if req.CloudAccountID == nil {
return details, nil
}
config, err := a.getServiceConfig(ctx, azureDefinition, req.OrgID.String(), req.ServiceId, *req.CloudAccountID)
if err != nil {
return nil, err
}
details.Config = config
// fill default values for config
if details.Config == nil {
cfg := new(integrationstypes.AzureCloudServiceConfig)
logs := make([]*integrationstypes.AzureCloudServiceLogsConfig, 0)
if azureDefinition.Strategy != nil && azureDefinition.Strategy.Logs != nil {
for _, log := range azureDefinition.Strategy.Logs {
logs = append(logs, &integrationstypes.AzureCloudServiceLogsConfig{
Enabled: false,
Name: log.Name,
})
}
}
metrics := make([]*integrationstypes.AzureCloudServiceMetricsConfig, 0)
if azureDefinition.Strategy != nil && azureDefinition.Strategy.Metrics != nil {
for _, metric := range azureDefinition.Strategy.Metrics {
metrics = append(metrics, &integrationstypes.AzureCloudServiceMetricsConfig{
Enabled: false,
Name: metric.Name,
})
}
}
cfg.Logs = logs
cfg.Metrics = metrics
details.Config = cfg
}
// TODO: write logic for getting connection status
return details, nil
}
func (a *azureProvider) getServiceConfig(
ctx context.Context,
definition *integrationstypes.AzureDefinition,
orgID string,
serviceId string,
cloudAccountId string,
) (*integrationstypes.AzureCloudServiceConfig, error) {
activeAccount, err := a.accountsRepo.GetConnectedCloudAccount(ctx, orgID, a.GetName().String(), cloudAccountId)
if err != nil {
return nil, err
}
configBytes, err := a.serviceConfigRepo.Get(ctx, orgID, activeAccount.ID.String(), serviceId)
if err != nil {
if errors.Ast(err, errors.TypeNotFound) {
return nil, nil
}
return nil, err
}
config := new(integrationstypes.AzureCloudServiceConfig)
err = integrationstypes.UnmarshalJSON(configBytes, config)
if err != nil {
return nil, err
}
for _, metric := range config.Metrics {
if metric.Enabled {
definition.PopulateDashboardURLs(a.GetName(), serviceId)
break
}
}
return config, nil
}
func (a *azureProvider) GenerateConnectionArtifact(ctx context.Context, req *integrationstypes.PostableConnectionArtifact) (any, error) {
connection := new(integrationstypes.PostableAzureConnectionCommand)
err := integrationstypes.UnmarshalJSON(req.Data, connection)
if err != nil {
return nil, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed unmarshal request data into AWS connection config")
}
// validate connection config
if connection.AccountConfig != nil {
if !integrationstypes.ValidAzureRegions[connection.AccountConfig.DeploymentRegion] {
return nil, errors.NewInvalidInputf(CodeInvalidAzureRegion, "invalid azure region: %s",
connection.AccountConfig.DeploymentRegion,
)
}
}
config, err := integrationstypes.MarshalJSON(connection.AccountConfig)
if err != nil {
return nil, err
}
account, err := a.accountsRepo.Upsert(
ctx, req.OrgID, a.GetName().String(), nil, config,
nil, nil, nil,
)
if err != nil {
return nil, err
}
agentVersion := "v0.0.1"
if connection.AgentConfig.Version != "" {
agentVersion = connection.AgentConfig.Version
}
// TODO: improve the command and set url
cliCommand := []string{"az", "stack", "sub", "create", "--name", "SigNozIntegration", "--location",
connection.AccountConfig.DeploymentRegion, "--template-uri", fmt.Sprintf("<url>%s", agentVersion),
"--action-on-unmanage", "deleteAll", "--deny-settings-mode", "denyDelete", "--parameters", fmt.Sprintf("rgName=%s", "signoz-integration-rg"),
fmt.Sprintf("rgLocation=%s", connection.AccountConfig.DeploymentRegion)}
return &integrationstypes.GettableAzureConnectionCommand{
AccountId: account.ID.String(),
AzureShellConnectionCommand: "az create",
AzureCliConnectionCommand: strings.Join(cliCommand, " "),
}, nil
}
func (a *azureProvider) UpdateServiceConfig(ctx context.Context, req *integrationstypes.PatchableServiceConfig) (any, error) {
definition, err := a.azureServiceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
if err != nil {
return nil, err
}
serviceConfig := new(integrationstypes.PatchableAzureCloudServiceConfig)
err = integrationstypes.UnmarshalJSON(req.Config, serviceConfig)
if err != nil {
return nil, err
}
if err = serviceConfig.Config.Validate(definition); err != nil {
return nil, err
}
// can only update config for a connected cloud account id
_, err = a.accountsRepo.GetConnectedCloudAccount(
ctx, req.OrgID, a.GetName().String(), serviceConfig.CloudAccountId,
)
if err != nil {
return nil, err
}
serviceConfigBytes, err := integrationstypes.MarshalJSON(serviceConfig.Config)
if err != nil {
return nil, err
}
updatedConfig, err := a.serviceConfigRepo.Upsert(
ctx, req.OrgID, a.GetName().String(), serviceConfig.CloudAccountId, req.ServiceId, serviceConfigBytes,
)
if err != nil {
return nil, err
}
if err = integrationstypes.UnmarshalJSON(updatedConfig, serviceConfig); err != nil {
return nil, err
}
return &integrationstypes.PatchServiceConfigResponse{
ServiceId: req.ServiceId,
Config: serviceConfig,
}, nil
}
func (a *azureProvider) GetAvailableDashboards(ctx context.Context, orgID valuer.UUID) ([]*dashboardtypes.Dashboard, error) {
accountRecords, err := a.accountsRepo.ListConnected(ctx, orgID.StringValue(), a.GetName().String())
if err != nil {
return nil, err
}
// for now service dashboards are only available when metrics are enabled.
servicesWithAvailableMetrics := map[string]*time.Time{}
for _, ar := range accountRecords {
if ar.AccountID == nil {
continue
}
configsBySvcId, err := a.serviceConfigRepo.GetAllForAccount(ctx, orgID.StringValue(), ar.ID.StringValue())
if err != nil {
return nil, err
}
for svcId, config := range configsBySvcId {
serviceConfig := new(integrationstypes.AzureCloudServiceConfig)
err = integrationstypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
return nil, err
}
if serviceConfig.Metrics != nil {
for _, metric := range serviceConfig.Metrics {
if metric.Enabled {
servicesWithAvailableMetrics[svcId] = &ar.CreatedAt
break
}
}
}
}
}
svcDashboards := make([]*dashboardtypes.Dashboard, 0)
allServices, err := a.azureServiceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to list azure service definitions")
}
for _, svc := range allServices {
serviceDashboardsCreatedAt := servicesWithAvailableMetrics[svc.Id]
if serviceDashboardsCreatedAt != nil {
svcDashboards = integrationstypes.GetDashboardsFromAssets(svc.Id, a.GetName(), serviceDashboardsCreatedAt, svc.Assets)
servicesWithAvailableMetrics[svc.Id] = nil
}
}
return svcDashboards, nil
}
func (a *azureProvider) GetDashboard(ctx context.Context, req *integrationstypes.GettableDashboard) (*dashboardtypes.Dashboard, error) {
allDashboards, err := a.GetAvailableDashboards(ctx, req.OrgID)
if err != nil {
return nil, err
}
for _, dashboard := range allDashboards {
if dashboard.ID == req.ID {
return dashboard, nil
}
}
return nil, errors.NewNotFoundf(CodeDashboardNotFound, "dashboard with id %s not found", req.ID)
}
func (a *azureProvider) UpdateAccountConfig(ctx context.Context, req *integrationstypes.PatchableAccountConfig) (any, error) {
config := new(integrationstypes.PatchableAzureAccountConfig)
err := integrationstypes.UnmarshalJSON(req.Data, config)
if err != nil {
return nil, err
}
if config.Config == nil && len(config.Config.EnabledResourceGroups) < 1 {
return nil, errors.NewInvalidInputf(CodeInvalidAzureRegion, "azure region and resource groups must be provided")
}
//for azure, preserve deployment region if already set
account, err := a.accountsRepo.Get(ctx, req.OrgID, a.GetName().String(), req.AccountId)
if err != nil {
return nil, err
}
storedConfig := new(integrationstypes.AzureAccountConfig)
err = integrationstypes.UnmarshalJSON([]byte(account.Config), storedConfig)
if err != nil {
return nil, err
}
if account.Config != "" {
config.Config.DeploymentRegion = storedConfig.DeploymentRegion
}
configBytes, err := integrationstypes.MarshalJSON(config.Config)
if err != nil {
return nil, err
}
accountRecord, err := a.accountsRepo.Upsert(
ctx, req.OrgID, a.GetName().String(), &req.AccountId, configBytes, nil, nil, nil,
)
if err != nil {
return nil, err
}
return accountRecord.Account(a.GetName()), nil
}
func (a *azureProvider) DisconnectAccount(ctx context.Context, orgID, accountID string) (*integrationstypes.CloudIntegration, error) {
account, err := a.accountsRepo.Get(ctx, orgID, a.GetName().String(), accountID)
if err != nil {
return nil, err
}
tsNow := time.Now()
account, err = a.accountsRepo.Upsert(
ctx, orgID, a.GetName().String(), &accountID, nil, nil, nil, &tsNow,
)
if err != nil {
return nil, err
}
return account, nil
}

View File

@@ -3,9 +3,10 @@ package cloudintegrations
import (
"log/slog"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/implawsprovider"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/implazureprovider"
integrationstore "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
)
@@ -13,16 +14,17 @@ import (
func NewCloudProviderRegistry(
logger *slog.Logger,
store sqlstore.SQLStore,
reader interfaces.Reader,
querier interfaces.Querier,
querier querier.Querier,
) map[integrationstypes.CloudProviderType]integrationstypes.CloudProvider {
registry := make(map[integrationstypes.CloudProviderType]integrationstypes.CloudProvider)
accountsRepo := integrationstore.NewCloudProviderAccountsRepository(store)
serviceConfigRepo := integrationstore.NewServiceConfigRepository(store)
awsProviderImpl := implawsprovider.NewAWSCloudProvider(logger, accountsRepo, serviceConfigRepo, reader, querier)
awsProviderImpl := implawsprovider.NewAWSCloudProvider(logger, accountsRepo, serviceConfigRepo, querier)
registry[integrationstypes.CloudProviderAWS] = awsProviderImpl
azureProviderImpl := implazureprovider.NewAzureCloudProvider(logger, accountsRepo, serviceConfigRepo, querier)
registry[integrationstypes.CloudProviderAzure] = azureProviderImpl
return registry
}

View File

@@ -0,0 +1 @@
<svg id="f2f04349-8aee-4413-84c9-a9053611b319" xmlns="http://www.w3.org/2000/svg" width="18" height="18" viewBox="0 0 18 18"><defs><linearGradient id="ad4c4f96-09aa-4f91-ba10-5cb8ad530f74" x1="9" y1="15.83" x2="9" y2="5.79" gradientUnits="userSpaceOnUse"><stop offset="0" stop-color="#b3b3b3" /><stop offset="0.26" stop-color="#c1c1c1" /><stop offset="1" stop-color="#e6e6e6" /></linearGradient></defs><title>Icon-storage-86</title><path d="M.5,5.79h17a0,0,0,0,1,0,0v9.48a.57.57,0,0,1-.57.57H1.07a.57.57,0,0,1-.57-.57V5.79A0,0,0,0,1,.5,5.79Z" fill="url(#ad4c4f96-09aa-4f91-ba10-5cb8ad530f74)" /><path d="M1.07,2.17H16.93a.57.57,0,0,1,.57.57V5.79a0,0,0,0,1,0,0H.5a0,0,0,0,1,0,0V2.73A.57.57,0,0,1,1.07,2.17Z" fill="#37c2b1" /><path d="M2.81,6.89H15.18a.27.27,0,0,1,.26.27v1.4a.27.27,0,0,1-.26.27H2.81a.27.27,0,0,1-.26-.27V7.16A.27.27,0,0,1,2.81,6.89Z" fill="#fff" /><path d="M2.82,9.68H15.19a.27.27,0,0,1,.26.27v1.41a.27.27,0,0,1-.26.27H2.82a.27.27,0,0,1-.26-.27V10A.27.27,0,0,1,2.82,9.68Z" fill="#37c2b1" /><path d="M2.82,12.5H15.19a.27.27,0,0,1,.26.27v1.41a.27.27,0,0,1-.26.27H2.82a.27.27,0,0,1-.26-.27V12.77A.27.27,0,0,1,2.82,12.5Z" fill="#258277" /></svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -0,0 +1,252 @@
{
"id": "blobstorage",
"title": "Blob Storage",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"supported_signals": {
"metrics": true,
"logs": true
},
"data_collected": {
"metrics": [
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
}
],
"logs": [
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
}
]
},
"telemetry_collection_strategy": {
"azure_metrics": [
{
"category_type": "metrics",
"name": "Capacity"
},
{
"category_type": "metrics",
"name": "Transaction"
}
],
"azure_logs": [
{
"category_type": "logs",
"name": "StorageRead"
},
{
"category_type": "logs",
"name": "StorageWrite"
},
{
"category_type": "logs",
"name": "StorageDelete"
}
]
},
"assets": {
"dashboards": [
{
"id": "overview",
"title": "Blob Storage Overview",
"description": "Overview of Blob Storage",
"definition": "file://assets/dashboards/overview.json"
}
]
}
}

View File

@@ -0,0 +1,2 @@
Monitor Azure Blob Storage with SigNoz
Collect key Blob Storage metrics and view them with an out of the box dashboard.

View File

@@ -0,0 +1 @@
<svg id="f2f04349-8aee-4413-84c9-a9053611b319" xmlns="http://www.w3.org/2000/svg" width="18" height="18" viewBox="0 0 18 18"><defs><linearGradient id="ad4c4f96-09aa-4f91-ba10-5cb8ad530f74" x1="9" y1="15.83" x2="9" y2="5.79" gradientUnits="userSpaceOnUse"><stop offset="0" stop-color="#b3b3b3" /><stop offset="0.26" stop-color="#c1c1c1" /><stop offset="1" stop-color="#e6e6e6" /></linearGradient></defs><title>Icon-storage-86</title><path d="M.5,5.79h17a0,0,0,0,1,0,0v9.48a.57.57,0,0,1-.57.57H1.07a.57.57,0,0,1-.57-.57V5.79A0,0,0,0,1,.5,5.79Z" fill="url(#ad4c4f96-09aa-4f91-ba10-5cb8ad530f74)" /><path d="M1.07,2.17H16.93a.57.57,0,0,1,.57.57V5.79a0,0,0,0,1,0,0H.5a0,0,0,0,1,0,0V2.73A.57.57,0,0,1,1.07,2.17Z" fill="#37c2b1" /><path d="M2.81,6.89H15.18a.27.27,0,0,1,.26.27v1.4a.27.27,0,0,1-.26.27H2.81a.27.27,0,0,1-.26-.27V7.16A.27.27,0,0,1,2.81,6.89Z" fill="#fff" /><path d="M2.82,9.68H15.19a.27.27,0,0,1,.26.27v1.41a.27.27,0,0,1-.26.27H2.82a.27.27,0,0,1-.26-.27V10A.27.27,0,0,1,2.82,9.68Z" fill="#37c2b1" /><path d="M2.82,12.5H15.19a.27.27,0,0,1,.26.27v1.41a.27.27,0,0,1-.26.27H2.82a.27.27,0,0,1-.26-.27V12.77A.27.27,0,0,1,2.82,12.5Z" fill="#258277" /></svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -0,0 +1,247 @@
{
"id": "frontdoor",
"title": "Front Door",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"supported_signals": {
"metrics": true,
"logs": true
},
"data_collected": {
"metrics": [
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
}
],
"logs": [
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
}
]
},
"telemetry_collection_strategy": {
"azure_metrics": [
{
"category_type": "metrics",
"name": "Capacity"
},
{
"category_type": "metrics",
"name": "Transaction"
}
],
"azure_logs": [
{
"category_type": "logs",
"name": "StorageRead"
},
{
"category_type": "logs",
"name": "StorageWrite"
},
{
"category_type": "logs",
"name": "StorageDelete"
}
]
},
"assets": {
"dashboards": [
{
"id": "overview",
"title": "Front Door Overview",
"description": "Overview of Blob Storage",
"definition": "file://assets/dashboards/overview.json"
}
]
}
}

View File

@@ -0,0 +1,2 @@
Monitor Azure Front Door with SigNoz
Collect key Front Door metrics and view them with an out of the box dashboard.

View File

@@ -20,45 +20,65 @@ var (
CodeServiceDefinitionNotFound = errors.MustNewCode("service_definition_not_dound")
)
type (
AWSServicesProvider struct {
definitions map[string]*integrationstypes.AWSServiceDefinition
}
)
type ServicesProvider[T integrationstypes.Definition] struct {
definitions map[string]T
}
func (a *AWSServicesProvider) ListServiceDefinitions(ctx context.Context) (map[string]*integrationstypes.AWSServiceDefinition, error) {
func (a *ServicesProvider[T]) ListServiceDefinitions(ctx context.Context) (map[string]T, error) {
return a.definitions, nil
}
func (a *AWSServicesProvider) GetServiceDefinition(ctx context.Context, serviceName string) (*integrationstypes.AWSServiceDefinition, error) {
func (a *ServicesProvider[T]) GetServiceDefinition(ctx context.Context, serviceName string) (T, error) {
def, ok := a.definitions[serviceName]
if !ok {
return nil, errors.NewNotFoundf(CodeServiceDefinitionNotFound, "aws service definition not found: %s", serviceName)
return *new(T), errors.NewNotFoundf(CodeServiceDefinitionNotFound, "azure service definition not found: %s", serviceName)
}
return def, nil
}
func NewAWSCloudProviderServices() (*AWSServicesProvider, error) {
func NewAWSCloudProviderServices() (*ServicesProvider[*integrationstypes.AWSDefinition], error) {
definitions, err := readAllServiceDefinitions(integrationstypes.CloudProviderAWS)
if err != nil {
return nil, err
}
serviceDefinitions := make(map[string]*integrationstypes.AWSServiceDefinition)
serviceDefinitions := make(map[string]*integrationstypes.AWSDefinition)
for id, def := range definitions {
typedDef, ok := def.(*integrationstypes.AWSServiceDefinition)
typedDef, ok := def.(*integrationstypes.AWSDefinition)
if !ok {
return nil, fmt.Errorf("invalid type for AWS service definition %s", id)
}
serviceDefinitions[id] = typedDef
}
return &AWSServicesProvider{
return &ServicesProvider[*integrationstypes.AWSDefinition]{
definitions: serviceDefinitions,
}, nil
}
func NewAzureCloudProviderServices() (*ServicesProvider[*integrationstypes.AzureDefinition], error) {
definitions, err := readAllServiceDefinitions(integrationstypes.CloudProviderAzure)
if err != nil {
return nil, err
}
serviceDefinitions := make(map[string]*integrationstypes.AzureDefinition)
for id, def := range definitions {
typedDef, ok := def.(*integrationstypes.AzureDefinition)
if !ok {
return nil, fmt.Errorf("invalid type for Azure service definition %s", id)
}
serviceDefinitions[id] = typedDef
}
return &ServicesProvider[*integrationstypes.AzureDefinition]{
definitions: serviceDefinitions,
}, nil
}
// End of API. Logic for reading service definition files follows
//go:embed definitions/*
var definitionFiles embed.FS
@@ -131,7 +151,9 @@ func readServiceDefinition(cloudProvider valuer.String, svcDirpath string) (inte
switch cloudProvider {
case integrationstypes.CloudProviderAWS:
serviceDef = &integrationstypes.AWSServiceDefinition{}
serviceDef = &integrationstypes.AWSDefinition{}
case integrationstypes.CloudProviderAzure:
serviceDef = &integrationstypes.AzureDefinition{}
default:
// ideally this shouldn't happen hence throwing internal error
return nil, errors.NewInternalf(errors.CodeInternal, "unsupported cloud provider: %s", cloudProvider)

View File

@@ -217,8 +217,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
cloudIntegrationsRegistry := cloudintegrations.NewCloudProviderRegistry(
opts.Logger,
opts.Signoz.SQLStore,
opts.Reader,
querier,
opts.Signoz.Querier,
)
aH := &APIHandler{

View File

@@ -13,6 +13,34 @@ import (
"github.com/uptrace/bun"
)
// Generic utility functions for JSON serialization/deserialization
// UnmarshalJSON is a generic function to unmarshal JSON data into any type
func UnmarshalJSON[T any](src []byte, target *T) error {
err := json.Unmarshal(src, target)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize JSON",
)
}
return nil
}
// MarshalJSON is a generic function to marshal any type to JSON
func MarshalJSON[T any](source *T) ([]byte, error) {
if source == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "source is nil")
}
serialized, err := json.Marshal(source)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize to JSON",
)
}
return serialized, nil
}
// CloudProvider defines the interface to be implemented by different cloud providers.
// This is generic interface so it will be accepting and returning generic types instead of concrete.
// It's the cloud provider's responsibility to cast them to appropriate types and validate
@@ -23,13 +51,13 @@ type CloudProvider interface {
GenerateConnectionArtifact(ctx context.Context, req *PostableConnectionArtifact) (any, error)
GetAccountStatus(ctx context.Context, orgID, accountID string) (*GettableAccountStatus, error)
ListServices(ctx context.Context, orgID string, accountID *string) (any, error) // returns either GettableAWSServices
ListServices(ctx context.Context, orgID string, accountID *string) (any, error) // returns either GettableAWSServices or GettableAzureServices
GetServiceDetails(ctx context.Context, req *GetServiceDetailsReq) (any, error)
ListConnectedAccounts(ctx context.Context, orgID string) (*GettableConnectedAccountsList, error)
GetDashboard(ctx context.Context, req *GettableDashboard) (*dashboardtypes.Dashboard, error)
GetAvailableDashboards(ctx context.Context, orgID valuer.UUID) ([]*dashboardtypes.Dashboard, error)
UpdateAccountConfig(ctx context.Context, req *PatchableAccountConfig) (any, error) // req can be either PatchableAWSAccountConfig
UpdateAccountConfig(ctx context.Context, req *PatchableAccountConfig) (any, error) // req can be either PatchableAWSAccountConfig or PatchableAzureAccountConfig
UpdateServiceConfig(ctx context.Context, req *PatchableServiceConfig) (any, error)
DisconnectAccount(ctx context.Context, orgID, accountID string) (*CloudIntegration, error)
@@ -93,36 +121,40 @@ type SigNozAWSAgentConfig struct {
type PostableConnectionArtifact struct {
OrgID string
Data []byte // either PostableAWSConnectionUrl
Data []byte // either PostableAWSConnectionUrl or PostableAzureConnectionCommand
}
type PostableAWSConnectionUrl struct {
// Optional. To be specified for updates.
// TODO: evaluate and remove if not needed.
AccountId *string `json:"account_id,omitempty"`
AccountConfig *AWSAccountConfig `json:"account_config"`
AgentConfig *SigNozAWSAgentConfig `json:"agent_config"`
type PostableConnectionArtifactTyped[AgentConfigT any, AccountConfigT any] struct {
AccountId *string `json:"account_id,omitempty"` // Optional. To be specified for updates.
AgentConfig *AgentConfigT `json:"agent_config"`
AccountConfig *AccountConfigT `json:"account_config"`
}
func (p *PostableAWSConnectionUrl) Unmarshal(src any) error {
var data []byte
switch src := src.(type) {
case []byte:
data = src
case string:
data = []byte(src)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
type PostableAWSConnectionUrl = PostableConnectionArtifactTyped[SigNozAWSAgentConfig, AWSAccountConfig]
type PostableAzureConnectionCommand = PostableConnectionArtifactTyped[SigNozAzureAgentConfig, AzureAccountConfig]
err := json.Unmarshal(data, p)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize aws connection url request from JSON",
)
}
type SigNozAzureAgentConfig struct {
IngestionUrl string `json:"ingestion_url"`
IngestionKey string `json:"ingestion_key"`
SigNozAPIUrl string `json:"signoz_api_url"`
SigNozAPIKey string `json:"signoz_api_key"`
return nil
Version string `json:"version,omitempty"`
}
// GettableConnectionArtifact represents base structure for connection artifacts
type GettableConnectionArtifact[T any] struct {
AccountId string `json:"account_id"`
Artifact T `json:",inline"`
}
type GettableAWSConnectionArtifact struct {
ConnectionUrl string `json:"connection_url"`
}
type GettableAzureConnectionArtifact struct {
AzureShellConnectionCommand string `json:"az_shell_connection_command"`
AzureCliConnectionCommand string `json:"az_cli_connection_command"`
}
type GettableAWSConnectionUrl struct {
@@ -130,6 +162,12 @@ type GettableAWSConnectionUrl struct {
ConnectionUrl string `json:"connection_url"`
}
type GettableAzureConnectionCommand struct {
AccountId string `json:"account_id"`
AzureShellConnectionCommand string `json:"az_shell_connection_command"`
AzureCliConnectionCommand string `json:"az_cli_connection_command"`
}
type GettableAccountStatus struct {
Id string `json:"id"`
CloudAccountId *string `json:"cloud_account_id,omitempty"`
@@ -144,82 +182,53 @@ type PostableAgentCheckInPayload struct {
OrgID string `json:"-"`
}
type GettableAWSAgentCheckIn struct {
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`
IntegrationConfig AWSAgentIntegrationConfig `json:"integration_config"`
}
type AWSAgentIntegrationConfig struct {
EnabledRegions []string `json:"enabled_regions"`
TelemetryCollectionStrategy *AWSCollectionStrategy `json:"telemetry,omitempty"`
}
type AzureAgentIntegrationConfig struct {
DeploymentRegion string `json:"deployment_region"` // will not be changed once set
EnabledResourceGroups []string `json:"resource_groups"`
// TelemetryCollectionStrategy is map of service to telemetry config
TelemetryCollectionStrategy map[string]*AzureCollectionStrategy `json:"telemetry,omitempty"`
}
type GettableAgentCheckIn[T any] struct {
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`
IntegrationConfig T `json:"integration_config"`
}
type GettableAWSAgentCheckIn = GettableAgentCheckIn[AWSAgentIntegrationConfig]
type GettableAzureAgentCheckIn = GettableAgentCheckIn[AzureAgentIntegrationConfig]
type PatchableServiceConfig struct {
OrgID string `json:"org_id"`
ServiceId string `json:"service_id"`
Config []byte `json:"config"` // json serialized config
}
type PatchableAWSCloudServiceConfig struct {
CloudAccountId string `json:"cloud_account_id"`
Config *AWSCloudServiceConfig `json:"config"`
type PatchableCloudServiceConfig[T any] struct {
CloudAccountId string `json:"cloud_account_id"`
Config *T `json:"config"`
}
type PatchableAWSCloudServiceConfig = PatchableCloudServiceConfig[AWSCloudServiceConfig]
type PatchableAzureCloudServiceConfig = PatchableCloudServiceConfig[AzureCloudServiceConfig]
type AWSCloudServiceConfig struct {
Logs *AWSCloudServiceLogsConfig `json:"logs,omitempty"`
Metrics *AWSCloudServiceMetricsConfig `json:"metrics,omitempty"`
}
// Unmarshal unmarshalls data from src
func (c *PatchableAWSCloudServiceConfig) Unmarshal(src []byte) error {
err := json.Unmarshal(src, c)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize aws service config req from JSON",
)
}
return nil
type AzureCloudServiceConfig struct {
Logs []*AzureCloudServiceLogsConfig `json:"logs,omitempty"`
Metrics []*AzureCloudServiceMetricsConfig `json:"metrics,omitempty"`
}
// Marshal serializes data to bytes
func (c *PatchableAWSCloudServiceConfig) Marshal() ([]byte, error) {
serialized, err := json.Marshal(c)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize aws service config req to JSON",
)
}
return serialized, nil
}
// Unmarshal unmarshalls data from src
func (a *AWSCloudServiceConfig) Unmarshal(src []byte) error {
err := json.Unmarshal(src, a)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize cloud service config from JSON",
)
}
return nil
}
// Marshal serializes data to bytes
func (a *AWSCloudServiceConfig) Marshal() ([]byte, error) {
serialized, err := json.Marshal(a)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize cloud service config to JSON",
)
}
return serialized, nil
}
func (a *AWSCloudServiceConfig) Validate(def *AWSServiceDefinition) error {
func (a *AWSCloudServiceConfig) Validate(def *AWSDefinition) error {
if def.Id != S3Sync && a.Logs != nil && a.Logs.S3Buckets != nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "s3 buckets can only be added to service-type[%s]", S3Sync)
} else if def.Id == S3Sync && a.Logs != nil && a.Logs.S3Buckets != nil {
@@ -233,6 +242,47 @@ func (a *AWSCloudServiceConfig) Validate(def *AWSServiceDefinition) error {
return nil
}
func (a *AzureCloudServiceConfig) Validate(def *AzureDefinition) error {
logsMap := make(map[string]bool)
metricsMap := make(map[string]bool)
if def.Strategy != nil && def.Strategy.Logs != nil {
for _, log := range def.Strategy.Logs {
logsMap[log.Name] = true
}
}
if def.Strategy != nil && def.Strategy.Metrics != nil {
for _, metric := range def.Strategy.Metrics {
metricsMap[metric.Name] = true
}
}
for _, log := range a.Logs {
if _, found := logsMap[log.Name]; !found {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid log name: %s", log.Name)
}
}
for _, metric := range a.Metrics {
if _, found := metricsMap[metric.Name]; !found {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid metric name: %s", metric.Name)
}
}
return nil
}
type AzureCloudServiceLogsConfig struct {
Enabled bool `json:"enabled"`
Name string `json:"name"`
}
type AzureCloudServiceMetricsConfig struct {
Enabled bool `json:"enabled"`
Name string `json:"name"`
}
type PatchServiceConfigResponse struct {
ServiceId string `json:"id"`
Config any `json:"config"`
@@ -241,70 +291,31 @@ type PatchServiceConfigResponse struct {
type PatchableAccountConfig struct {
OrgID string
AccountId string
Data []byte // can be either AWSAccountConfig
Data []byte // can be either AWSAccountConfig or AzureAccountConfig
}
type PatchableAWSAccountConfig struct {
Config *AWSAccountConfig `json:"config"`
type PatchableAccountConfigTyped[T any] struct {
Config *T `json:"config"`
}
func (p *PatchableAWSAccountConfig) Unmarshal(src []byte) error {
err := json.Unmarshal(src, p)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize patchable account config from JSON",
)
}
return nil
}
func (p *PatchableAWSAccountConfig) Marshal() ([]byte, error) {
if p == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "patchable account config is nil")
}
serialized, err := json.Marshal(p)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize patchable account config to JSON",
)
}
return serialized, nil
}
type PatchableAWSAccountConfig = PatchableAccountConfigTyped[AWSAccountConfig]
type PatchableAzureAccountConfig = PatchableAccountConfigTyped[AzureAccountConfig]
type AWSAccountConfig struct {
EnabledRegions []string `json:"regions"`
}
// Unmarshal unmarshalls data from src
func (c *AWSAccountConfig) Unmarshal(src []byte) error {
err := json.Unmarshal(src, c)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize AWS account config from JSON",
)
}
return nil
type AzureAccountConfig struct {
DeploymentRegion string `json:"deployment_region,omitempty"`
EnabledResourceGroups []string `json:"resource_groups,omitempty"`
}
// Marshal serializes data to bytes
func (c *AWSAccountConfig) Marshal() ([]byte, error) {
if c == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "cloud account config is nil")
}
serialized, err := json.Marshal(c)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't serialize cloud account config to JSON")
}
return serialized, nil
type GettableServices[T any] struct {
Services []T `json:"services"`
}
type GettableAWSServices struct {
Services []AWSServiceSummary `json:"services"`
}
type GettableAWSServices = GettableServices[AWSServiceSummary]
type GettableAzureServices = GettableServices[AzureServiceSummary]
type GetServiceDetailsReq struct {
OrgID valuer.UUID
@@ -359,7 +370,11 @@ func (a *CloudIntegration) Account(cloudProvider CloudProviderType) *Account {
switch cloudProvider {
case CloudProviderAWS:
config := new(AWSAccountConfig)
_ = config.Unmarshal([]byte(a.Config))
_ = UnmarshalJSON([]byte(a.Config), config)
ca.Config = config
case CloudProviderAzure:
config := new(AzureAccountConfig)
_ = UnmarshalJSON([]byte(a.Config), config)
ca.Config = config
default:
}
@@ -370,7 +385,7 @@ func (a *CloudIntegration) Account(cloudProvider CloudProviderType) *Account {
type Account struct {
Id string `json:"id"`
CloudAccountId string `json:"cloud_account_id"`
Config any `json:"config"` // AWSAccountConfig
Config any `json:"config"` // AWSAccountConfig or AzureAccountConfig
Status AccountStatus `json:"status"`
}
@@ -388,23 +403,41 @@ func DefaultAWSAccountConfig() AWSAccountConfig {
}
}
type AWSServiceSummary struct {
DefinitionMetadata
Config *AWSCloudServiceConfig `json:"config"`
func DefaultAzureAccountConfig() AzureAccountConfig {
return AzureAccountConfig{
DeploymentRegion: "",
EnabledResourceGroups: []string{},
}
}
type ServiceSummary[T any] struct {
DefinitionMetadata
Config *T `json:"config"`
}
type AWSServiceSummary = ServiceSummary[AWSCloudServiceConfig]
type AzureServiceSummary = ServiceSummary[AzureCloudServiceConfig]
type GettableAWSServiceDetails struct {
AWSServiceDefinition
AWSDefinition
Config *AWSCloudServiceConfig `json:"config"`
ConnectionStatus *ServiceConnectionStatus `json:"status,omitempty"`
}
type GettableAzureServiceDetails struct {
AzureDefinition
Config *AzureCloudServiceConfig `json:"config"`
ConnectionStatus *ServiceConnectionStatus `json:"status,omitempty"`
}
type ServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
Logs []*SignalConnectionStatus `json:"logs"`
Metrics []*SignalConnectionStatus `json:"metrics"`
}
type SignalConnectionStatus struct {
CategoryID string `json:"category"`
CategoryDisplayName string `json:"category_display_name"`
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}
@@ -441,7 +474,7 @@ func (r *AgentReport) Value() (driver.Value, error) {
err, errors.CodeInternal, "couldn't serialize agent report to JSON",
)
}
return string(serialized), nil
return serialized, nil
}
type CloudIntegrationService struct {

View File

@@ -14,6 +14,51 @@ const (
S3Sync = "s3sync"
)
type AWSDefinition = ServiceDefinition[AWSCollectionStrategy]
type AzureDefinition = ServiceDefinition[AzureCollectionStrategy]
var _ Definition = &AWSDefinition{}
var _ Definition = &AzureDefinition{}
type ServiceDefinition[T any] struct {
DefinitionMetadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"data_collected"`
IngestionStatusCheck *IngestionStatusCheck `json:"ingestion_status_check,omitempty"`
Strategy *T `json:"telemetry_collection_strategy"`
}
func (def *ServiceDefinition[T]) PopulateDashboardURLs(cloudProvider CloudProviderType, svcId string) {
for i := range def.Assets.Dashboards {
dashboardId := def.Assets.Dashboards[i].Id
url := "/dashboard/" + GetCloudIntegrationDashboardID(cloudProvider, svcId, dashboardId)
def.Assets.Dashboards[i].Url = url
}
}
func (def *ServiceDefinition[T]) GetId() string {
return def.Id
}
func (def *ServiceDefinition[T]) Validate() error {
seenDashboardIds := map[string]interface{}{}
if def.Strategy == nil {
return errors.NewInternalf(errors.CodeInternal, "telemetry_collection_strategy is required")
}
for _, dd := range def.Assets.Dashboards {
if _, seen := seenDashboardIds[dd.Id]; seen {
return errors.NewInternalf(errors.CodeInternal, "multiple dashboards found with id %s for Azure Integration", dd.Id)
}
seenDashboardIds[dd.Id] = nil
}
return nil
}
type DefinitionMetadata struct {
Id string `json:"id"`
Title string `json:"title"`
@@ -23,18 +68,7 @@ type DefinitionMetadata struct {
type Definition interface {
GetId() string
Validate() error
PopulateDashboardURLs(svcId string)
}
var _ Definition = &AWSServiceDefinition{}
type AWSServiceDefinition struct {
DefinitionMetadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"data_collected"`
Strategy *AWSCollectionStrategy `json:"telemetry_collection_strategy"`
PopulateDashboardURLs(cloudProvider CloudProviderType, svcId string)
}
type IngestionStatusCheck struct {
@@ -58,36 +92,6 @@ type IngestionStatusCheckAttributeFilter struct {
Operator string `json:"operator"`
Value string `json:"value"`
}
func (def *AWSServiceDefinition) GetId() string {
return def.Id
}
func (def *AWSServiceDefinition) Validate() error {
seenDashboardIds := map[string]interface{}{}
if def.Strategy == nil {
return errors.NewInternalf(errors.CodeInternal, "telemetry_collection_strategy is required")
}
for _, dd := range def.Assets.Dashboards {
if _, seen := seenDashboardIds[dd.Id]; seen {
return errors.NewInternalf(errors.CodeInternal, "multiple dashboards found with id %s for AWS Integration", dd.Id)
}
seenDashboardIds[dd.Id] = nil
}
return nil
}
func (def *AWSServiceDefinition) PopulateDashboardURLs(serviceId string) {
for i := range def.Assets.Dashboards {
dashboardId := def.Assets.Dashboards[i].Id
url := "/dashboard/" + GetCloudIntegrationDashboardID(CloudProviderAWS, serviceId, dashboardId)
def.Assets.Dashboards[i].Url = url
}
}
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
@@ -116,11 +120,14 @@ type CollectedMetric struct {
}
type AWSCollectionStrategy struct {
Provider valuer.String `json:"provider"`
Metrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
Logs *AWSLogsStrategy `json:"aws_logs,omitempty"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type in AWS
}
AWSMetrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsStrategy `json:"aws_logs,omitempty"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type
type AzureCollectionStrategy struct {
Metrics []*AzureMetricsStrategy `json:"azure_metrics,omitempty"`
Logs []*AzureLogsStrategy `json:"azure_logs,omitempty"`
}
type AWSMetricsStrategy struct {
@@ -145,6 +152,16 @@ type AWSLogsStrategy struct {
} `json:"cloudwatch_logs_subscriptions"`
}
type AzureMetricsStrategy struct {
CategoryType string `json:"category_type"`
Name string `json:"name"`
}
type AzureLogsStrategy struct {
CategoryType string `json:"category_type"`
Name string `json:"name"`
}
type Dashboard struct {
Id string `json:"id"`
Url string `json:"url"`

View File

@@ -41,3 +41,62 @@ var ValidAWSRegions = map[string]bool{
"us-west-1": true, // US West (N. California).
"us-west-2": true, // US West (Oregon).
}
var ValidAzureRegions = map[string]bool{
"australiacentral": true,
"australiacentral2": true,
"australiaeast": true,
"australiasoutheast": true,
"austriaeast": true,
"belgiumcentral": true,
"brazilsouth": true,
"brazilsoutheast": true,
"canadacentral": true,
"canadaeast": true,
"centralindia": true,
"centralus": true,
"chilecentral": true,
"denmarkeast": true,
"eastasia": true,
"eastus": true,
"eastus2": true,
"francecentral": true,
"francesouth": true,
"germanynorth": true,
"germanywestcentral": true,
"indonesiacentral": true,
"israelcentral": true,
"italynorth": true,
"japaneast": true,
"japanwest": true,
"koreacentral": true,
"koreasouth": true,
"malaysiawest": true,
"mexicocentral": true,
"newzealandnorth": true,
"northcentralus": true,
"northeurope": true,
"norwayeast": true,
"norwaywest": true,
"polandcentral": true,
"qatarcentral": true,
"southafricanorth": true,
"southafricawest": true,
"southcentralus": true,
"southindia": true,
"southeastasia": true,
"spaincentral": true,
"swedencentral": true,
"switzerlandnorth": true,
"switzerlandwest": true,
"uaecentral": true,
"uaenorth": true,
"uksouth": true,
"ukwest": true,
"westcentralus": true,
"westeurope": true,
"westindia": true,
"westus": true,
"westus2": true,
"westus3": true,
}

View File

@@ -16,7 +16,8 @@ import (
type CloudProviderType = valuer.String
var (
CloudProviderAWS = valuer.NewString("aws")
CloudProviderAWS = valuer.NewString("aws")
CloudProviderAzure = valuer.NewString("azure")
)
var (
@@ -25,7 +26,7 @@ var (
func NewCloudProvider(provider string) (CloudProviderType, error) {
switch provider {
case CloudProviderAWS.String():
case CloudProviderAWS.String(), CloudProviderAzure.String():
return valuer.NewString(provider), nil
default:
return CloudProviderType{}, errors.NewInvalidInputf(CodeCloudProviderInvalidInput, "invalid cloud provider: %s", provider)
@@ -33,11 +34,13 @@ func NewCloudProvider(provider string) (CloudProviderType, error) {
}
var (
AWSIntegrationUserEmail = valuer.MustNewEmail("aws-integration@signoz.io")
AWSIntegrationUserEmail = valuer.MustNewEmail("aws-integration@signoz.io")
AzureIntegrationUserEmail = valuer.MustNewEmail("azure-integration@signoz.io")
)
var IntegrationUserEmails = []valuer.Email{
AWSIntegrationUserEmail,
AzureIntegrationUserEmail,
}
func IsCloudIntegrationDashboardUuid(dashboardUuid string) bool {