Compare commits

..

6 Commits

Author SHA1 Message Date
Nikhil Mantri
dba06754ce Merge branch 'main' into feat/infra_disk_usage 2026-02-12 13:39:32 +05:30
Nikhil Mantri
7f693afce2 Merge branch 'main' into feat/infra_disk_usage 2026-02-11 14:40:39 +05:30
Nikhil Mantri
89dcb5da97 Merge branch 'main' into feat/infra_disk_usage 2026-02-10 14:17:41 +05:30
nikhilmantri0902
feef6515bf chore: rearrange widgets 2026-02-10 14:14:25 +05:30
nikhilmantri0902
e20eca9b62 chore: removed log 2026-02-10 12:29:29 +05:30
nikhilmantri0902
1332098b7d chore: added disk usage 2026-02-08 17:55:22 +05:30
49 changed files with 5016 additions and 6118 deletions

View File

@@ -1,7 +1,6 @@
package api
import (
"log/slog"
"net/http"
"net/http/httputil"
"time"
@@ -14,6 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/middleware"
querierAPI "github.com/SigNoz/signoz/pkg/querier"
baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
@@ -30,13 +30,13 @@ type APIHandlerOptions struct {
RulesManager *rules.Manager
UsageManager *usage.Manager
IntegrationsController *integrations.Controller
CloudIntegrationsController *cloudintegrations.Controller
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
Gateway *httputil.ReverseProxy
GatewayUrl string
// Querier Influx Interval
FluxInterval time.Duration
GlobalConfig global.Config
Logger *slog.Logger // this is present in Signoz.Instrumentation but adding for quick access
}
type APIHandler struct {
@@ -50,6 +50,7 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
Reader: opts.DataConnector,
RuleManager: opts.RulesManager,
IntegrationsController: opts.IntegrationsController,
CloudIntegrationsController: opts.CloudIntegrationsController,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
FluxInterval: opts.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
@@ -57,7 +58,6 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
QueryParserAPI: queryparser.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.QueryParser),
Logger: opts.Logger,
})
if err != nil {
@@ -118,12 +118,14 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
}
func (ah *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *middleware.AuthZ) {
ah.APIHandler.RegisterCloudIntegrationsRoutes(router, am)
router.HandleFunc(
"/api/v1/cloud-integrations/{cloudProvider}/accounts/generate-connection-params",
am.EditAccess(ah.CloudIntegrationsGenerateConnectionParams),
).Methods(http.MethodGet)
}
func (ah *APIHandler) getVersion(w http.ResponseWriter, r *http.Request) {

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
@@ -14,14 +13,20 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/user"
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
"go.uber.org/zap"
)
// TODO: move this file with other cloud integration related code
type CloudIntegrationConnectionParamsResponse struct {
IngestionUrl string `json:"ingestion_url,omitempty"`
IngestionKey string `json:"ingestion_key,omitempty"`
SigNozAPIUrl string `json:"signoz_api_url,omitempty"`
SigNozAPIKey string `json:"signoz_api_key,omitempty"`
}
func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
@@ -36,21 +41,23 @@ func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseW
return
}
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
cloudProvider := mux.Vars(r)["cloudProvider"]
if cloudProvider != "aws" {
RespondError(w, basemodel.BadRequest(fmt.Errorf(
"cloud provider not supported: %s", cloudProvider,
)), nil)
return
}
apiKey, err := ah.getOrCreateCloudIntegrationPAT(r.Context(), claims.OrgID, cloudProvider)
if err != nil {
render.Error(w, err)
apiKey, apiErr := ah.getOrCreateCloudIntegrationPAT(r.Context(), claims.OrgID, cloudProvider)
if apiErr != nil {
RespondError(w, basemodel.WrapApiError(
apiErr, "couldn't provision PAT for cloud integration:",
), nil)
return
}
result := integrationstypes.GettableCloudIntegrationConnectionParams{
result := CloudIntegrationConnectionParamsResponse{
SigNozAPIKey: apiKey,
}
@@ -64,17 +71,16 @@ func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseW
// Return the API Key (PAT) even if the rest of the params can not be deduced.
// Params not returned from here will be requested from the user via form inputs.
// This enables gracefully degraded but working experience even for non-cloud deployments.
ah.opts.Logger.InfoContext(
r.Context(),
"ingestion params and signoz api url can not be deduced since no license was found",
)
render.Success(w, http.StatusOK, result)
zap.L().Info("ingestion params and signoz api url can not be deduced since no license was found")
ah.Respond(w, result)
return
}
signozApiUrl, err := ah.getIngestionUrlAndSigNozAPIUrl(r.Context(), license.Key)
if err != nil {
render.Error(w, err)
signozApiUrl, apiErr := ah.getIngestionUrlAndSigNozAPIUrl(r.Context(), license.Key)
if apiErr != nil {
RespondError(w, basemodel.WrapApiError(
apiErr, "couldn't deduce ingestion url and signoz api url",
), nil)
return
}
@@ -83,41 +89,48 @@ func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseW
gatewayUrl := ah.opts.GatewayUrl
if len(gatewayUrl) > 0 {
ingestionKeyString, err := ah.getOrCreateCloudProviderIngestionKey(
ingestionKey, apiErr := getOrCreateCloudProviderIngestionKey(
r.Context(), gatewayUrl, license.Key, cloudProvider,
)
if err != nil {
render.Error(w, err)
if apiErr != nil {
RespondError(w, basemodel.WrapApiError(
apiErr, "couldn't get or create ingestion key",
), nil)
return
}
result.IngestionKey = ingestionKeyString
result.IngestionKey = ingestionKey
} else {
ah.opts.Logger.InfoContext(
r.Context(),
"ingestion key can't be deduced since no gateway url has been configured",
)
zap.L().Info("ingestion key can't be deduced since no gateway url has been configured")
}
render.Success(w, http.StatusOK, result)
ah.Respond(w, result)
}
func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId string, cloudProvider valuer.String) (string, error) {
func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId string, cloudProvider string) (
string, *basemodel.ApiError,
) {
integrationPATName := fmt.Sprintf("%s integration", cloudProvider)
integrationUser, err := ah.getOrCreateCloudIntegrationUser(ctx, orgId, cloudProvider)
if err != nil {
return "", err
integrationUser, apiErr := ah.getOrCreateCloudIntegrationUser(ctx, orgId, cloudProvider)
if apiErr != nil {
return "", apiErr
}
orgIdUUID, err := valuer.NewUUID(orgId)
if err != nil {
return "", err
return "", basemodel.InternalError(fmt.Errorf(
"couldn't parse orgId: %w", err,
))
}
allPats, err := ah.Signoz.Modules.User.ListAPIKeys(ctx, orgIdUUID)
if err != nil {
return "", err
return "", basemodel.InternalError(fmt.Errorf(
"couldn't list PATs: %w", err,
))
}
for _, p := range allPats {
if p.UserID == integrationUser.ID && p.Name == integrationPATName {
@@ -125,10 +138,9 @@ func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId
}
}
ah.opts.Logger.InfoContext(
ctx,
zap.L().Info(
"no PAT found for cloud integration, creating a new one",
slog.String("cloudProvider", cloudProvider.String()),
zap.String("cloudProvider", cloudProvider),
)
newPAT, err := types.NewStorableAPIKey(
@@ -138,48 +150,68 @@ func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId
0,
)
if err != nil {
return "", err
return "", basemodel.InternalError(fmt.Errorf(
"couldn't create cloud integration PAT: %w", err,
))
}
err = ah.Signoz.Modules.User.CreateAPIKey(ctx, newPAT)
if err != nil {
return "", err
return "", basemodel.InternalError(fmt.Errorf(
"couldn't create cloud integration PAT: %w", err,
))
}
return newPAT.Token, nil
}
// TODO: move this function out of handler and use proper module structure
func (ah *APIHandler) getOrCreateCloudIntegrationUser(ctx context.Context, orgId string, cloudProvider valuer.String) (*types.User, error) {
cloudIntegrationUserName := fmt.Sprintf("%s-integration", cloudProvider.String())
func (ah *APIHandler) getOrCreateCloudIntegrationUser(
ctx context.Context, orgId string, cloudProvider string,
) (*types.User, *basemodel.ApiError) {
cloudIntegrationUserName := fmt.Sprintf("%s-integration", cloudProvider)
email := valuer.MustNewEmail(fmt.Sprintf("%s@signoz.io", cloudIntegrationUserName))
cloudIntegrationUser, err := types.NewUser(cloudIntegrationUserName, email, types.RoleViewer, valuer.MustNewUUID(orgId))
if err != nil {
return nil, err
return nil, basemodel.InternalError(fmt.Errorf("couldn't create cloud integration user: %w", err))
}
password := types.MustGenerateFactorPassword(cloudIntegrationUser.ID.StringValue())
cloudIntegrationUser, err = ah.Signoz.Modules.User.GetOrCreateUser(ctx, cloudIntegrationUser, user.WithFactorPassword(password))
if err != nil {
return nil, err
return nil, basemodel.InternalError(fmt.Errorf("couldn't look for integration user: %w", err))
}
return cloudIntegrationUser, nil
}
// TODO: move this function out of handler and use proper module structure
func (ah *APIHandler) getIngestionUrlAndSigNozAPIUrl(ctx context.Context, licenseKey string) (string, error) {
respBytes, err := ah.Signoz.Zeus.GetDeployment(ctx, licenseKey)
if err != nil {
return "", errors.WrapInternalf(err, errors.CodeInternal, "couldn't query for deployment info: error")
func (ah *APIHandler) getIngestionUrlAndSigNozAPIUrl(ctx context.Context, licenseKey string) (
string, *basemodel.ApiError,
) {
// TODO: remove this struct from here
type deploymentResponse struct {
Name string `json:"name"`
ClusterInfo struct {
Region struct {
DNS string `json:"dns"`
} `json:"region"`
} `json:"cluster"`
}
resp := new(integrationstypes.GettableDeployment)
respBytes, err := ah.Signoz.Zeus.GetDeployment(ctx, licenseKey)
if err != nil {
return "", basemodel.InternalError(fmt.Errorf(
"couldn't query for deployment info: error: %w", err,
))
}
resp := new(deploymentResponse)
err = json.Unmarshal(respBytes, resp)
if err != nil {
return "", errors.WrapInternalf(err, errors.CodeInternal, "couldn't unmarshal deployment info response")
return "", basemodel.InternalError(fmt.Errorf(
"couldn't unmarshal deployment info response: error: %w", err,
))
}
regionDns := resp.ClusterInfo.Region.DNS
@@ -187,11 +219,9 @@ func (ah *APIHandler) getIngestionUrlAndSigNozAPIUrl(ctx context.Context, licens
if len(regionDns) < 1 || len(deploymentName) < 1 {
// Fail early if actual response structure and expectation here ever diverge
return "", errors.WrapInternalf(
err,
errors.CodeInternal,
return "", basemodel.InternalError(fmt.Errorf(
"deployment info response not in expected shape. couldn't determine region dns and deployment name",
)
))
}
signozApiUrl := fmt.Sprintf("https://%s.%s", deploymentName, regionDns)
@@ -199,85 +229,102 @@ func (ah *APIHandler) getIngestionUrlAndSigNozAPIUrl(ctx context.Context, licens
return signozApiUrl, nil
}
func (ah *APIHandler) getOrCreateCloudProviderIngestionKey(
ctx context.Context, gatewayUrl string, licenseKey string, cloudProvider valuer.String,
) (string, error) {
type ingestionKey struct {
Name string `json:"name"`
Value string `json:"value"`
// other attributes from gateway response not included here since they are not being used.
}
type ingestionKeysSearchResponse struct {
Status string `json:"status"`
Data []ingestionKey `json:"data"`
Error string `json:"error"`
}
type createIngestionKeyResponse struct {
Status string `json:"status"`
Data ingestionKey `json:"data"`
Error string `json:"error"`
}
func getOrCreateCloudProviderIngestionKey(
ctx context.Context, gatewayUrl string, licenseKey string, cloudProvider string,
) (string, *basemodel.ApiError) {
cloudProviderKeyName := fmt.Sprintf("%s-integration", cloudProvider)
// see if the key already exists
searchResult, err := requestGateway[integrationstypes.GettableIngestionKeysSearch](
searchResult, apiErr := requestGateway[ingestionKeysSearchResponse](
ctx,
gatewayUrl,
licenseKey,
fmt.Sprintf("/v1/workspaces/me/keys/search?name=%s", cloudProviderKeyName),
nil,
ah.opts.Logger,
)
if err != nil {
return "", err
if apiErr != nil {
return "", basemodel.WrapApiError(
apiErr, "couldn't search for cloudprovider ingestion key",
)
}
if searchResult.Status != "success" {
return "", errors.NewInternalf(
errors.CodeInternal,
"couldn't search for cloud provider ingestion key: status: %s, error: %s",
return "", basemodel.InternalError(fmt.Errorf(
"couldn't search for cloudprovider ingestion key: status: %s, error: %s",
searchResult.Status, searchResult.Error,
)
))
}
for _, k := range searchResult.Data {
if k.Name != cloudProviderKeyName {
continue
}
if k.Name == cloudProviderKeyName {
if len(k.Value) < 1 {
// Fail early if actual response structure and expectation here ever diverge
return "", basemodel.InternalError(fmt.Errorf(
"ingestion keys search response not as expected",
))
}
if len(k.Value) < 1 {
// Fail early if actual response structure and expectation here ever diverge
return "", errors.NewInternalf(errors.CodeInternal, "ingestion keys search response not as expected")
return k.Value, nil
}
return k.Value, nil
}
ah.opts.Logger.InfoContext(
ctx,
zap.L().Info(
"no existing ingestion key found for cloud integration, creating a new one",
slog.String("cloudProvider", cloudProvider.String()),
zap.String("cloudProvider", cloudProvider),
)
createKeyResult, err := requestGateway[integrationstypes.GettableCreateIngestionKey](
createKeyResult, apiErr := requestGateway[createIngestionKeyResponse](
ctx, gatewayUrl, licenseKey, "/v1/workspaces/me/keys",
map[string]any{
"name": cloudProviderKeyName,
"tags": []string{"integration", cloudProvider.String()},
"tags": []string{"integration", cloudProvider},
},
ah.opts.Logger,
)
if err != nil {
return "", err
if apiErr != nil {
return "", basemodel.WrapApiError(
apiErr, "couldn't create cloudprovider ingestion key",
)
}
if createKeyResult.Status != "success" {
return "", errors.NewInternalf(
errors.CodeInternal,
"couldn't create cloud provider ingestion key: status: %s, error: %s",
return "", basemodel.InternalError(fmt.Errorf(
"couldn't create cloudprovider ingestion key: status: %s, error: %s",
createKeyResult.Status, createKeyResult.Error,
)
))
}
ingestionKeyString := createKeyResult.Data.Value
if len(ingestionKeyString) < 1 {
ingestionKey := createKeyResult.Data.Value
if len(ingestionKey) < 1 {
// Fail early if actual response structure and expectation here ever diverge
return "", errors.NewInternalf(errors.CodeInternal,
return "", basemodel.InternalError(fmt.Errorf(
"ingestion key creation response not as expected",
)
))
}
return ingestionKeyString, nil
return ingestionKey, nil
}
func requestGateway[ResponseType any](
ctx context.Context, gatewayUrl, licenseKey, path string, payload any, logger *slog.Logger,
) (*ResponseType, error) {
ctx context.Context, gatewayUrl string, licenseKey string, path string, payload any,
) (*ResponseType, *basemodel.ApiError) {
baseUrl := strings.TrimSuffix(gatewayUrl, "/")
reqUrl := fmt.Sprintf("%s%s", baseUrl, path)
@@ -288,12 +335,13 @@ func requestGateway[ResponseType any](
"X-Consumer-Groups": "ns:default",
}
return requestAndParseResponse[ResponseType](ctx, reqUrl, headers, payload, logger)
return requestAndParseResponse[ResponseType](ctx, reqUrl, headers, payload)
}
func requestAndParseResponse[ResponseType any](
ctx context.Context, url string, headers map[string]string, payload any, logger *slog.Logger,
) (*ResponseType, error) {
ctx context.Context, url string, headers map[string]string, payload any,
) (*ResponseType, *basemodel.ApiError) {
reqMethod := http.MethodGet
var reqBody io.Reader
if payload != nil {
@@ -301,14 +349,18 @@ func requestAndParseResponse[ResponseType any](
bodyJson, err := json.Marshal(payload)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't marshal payload")
return nil, basemodel.InternalError(fmt.Errorf(
"couldn't serialize request payload to JSON: %w", err,
))
}
reqBody = bytes.NewBuffer(bodyJson)
reqBody = bytes.NewBuffer([]byte(bodyJson))
}
req, err := http.NewRequestWithContext(ctx, reqMethod, url, reqBody)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't create req")
return nil, basemodel.InternalError(fmt.Errorf(
"couldn't prepare request: %w", err,
))
}
for k, v := range headers {
@@ -321,26 +373,23 @@ func requestAndParseResponse[ResponseType any](
response, err := client.Do(req)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't make req")
return nil, basemodel.InternalError(fmt.Errorf("couldn't make request: %w", err))
}
defer func() {
err = response.Body.Close()
if err != nil {
logger.ErrorContext(ctx, "couldn't close response body", "error", err)
}
}()
defer response.Body.Close()
respBody, err := io.ReadAll(response.Body)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't read response body")
return nil, basemodel.InternalError(fmt.Errorf("couldn't read response: %w", err))
}
var resp ResponseType
err = json.Unmarshal(respBody, &resp)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't unmarshal response body")
return nil, basemodel.InternalError(fmt.Errorf(
"couldn't unmarshal gateway response into %T", resp,
))
}
return &resp, nil

View File

@@ -38,6 +38,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp"
@@ -126,6 +127,13 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
)
}
cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SQLStore)
if err != nil {
return nil, fmt.Errorf(
"couldn't create cloud provider integrations controller: %w", err,
)
}
// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
signoz.SQLStore,
@@ -159,12 +167,12 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
RulesManager: rm,
UsageManager: usageManager,
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
FluxInterval: config.Querier.FluxInterval,
Gateway: gatewayProxy,
GatewayUrl: config.Gateway.URL.String(),
GlobalConfig: config.Global,
Logger: signoz.Instrumentation.Logger(),
}
apiHandler, err := api.NewAPIHandler(apiOpts, signoz)

View File

@@ -39,6 +39,7 @@ export interface HostData {
waitTimeSeries: TimeSeries;
load15: number;
load15TimeSeries: TimeSeries;
filesystem: number;
}
export interface HostListResponse {

View File

@@ -48,7 +48,7 @@
.labels-row,
.values-row {
display: grid;
grid-template-columns: 1fr 1.5fr 1.5fr 1.5fr;
grid-template-columns: 1fr 1.5fr 1.5fr 1.5fr 1.5fr;
gap: 30px;
align-items: center;
}

View File

@@ -426,6 +426,12 @@ function HostMetricsDetails({
>
MEMORY USAGE
</Typography.Text>
<Typography.Text
type="secondary"
className="host-details-metadata-label"
>
DISK USAGE
</Typography.Text>
</div>
<div className="values-row">
@@ -478,6 +484,23 @@ function HostMetricsDetails({
className="progress-bar"
/>
</div>
<div className="progress-container">
<Progress
percent={Number((host.filesystem * 100).toFixed(1))}
size="small"
strokeColor={((): string => {
const filesystemPercent = Number((host.filesystem * 100).toFixed(1));
if (filesystemPercent >= 90) {
return Color.BG_CHERRY_500;
}
if (filesystemPercent >= 60) {
return Color.BG_AMBER_500;
}
return Color.BG_FOREST_500;
})()}
className="progress-bar"
/>
</div>
</div>
</div>
</div>

View File

@@ -27,6 +27,7 @@ export interface HostRowData {
hostName: string;
cpu: React.ReactNode;
memory: React.ReactNode;
filesystem: React.ReactNode;
wait: string;
load15: number;
active: React.ReactNode;
@@ -108,6 +109,14 @@ export const getHostsListColumns = (): ColumnType<HostRowData>[] => [
sorter: true,
align: 'right',
},
{
title: <div className="column-header-right">Disk Usage</div>,
dataIndex: 'filesystem',
key: 'filesystem',
width: 100,
sorter: true,
align: 'right',
},
{
title: <div className="column-header-right">IOWait</div>,
dataIndex: 'wait',
@@ -178,6 +187,26 @@ export const formatDataForTable = (data: HostData[]): HostRowData[] =>
/>
</div>
),
filesystem: (
<div className="progress-container">
<Progress
percent={Number((host.filesystem * 100).toFixed(1))}
strokeLinecap="butt"
size="small"
strokeColor={((): string => {
const filesystemPercent = Number((host.filesystem * 100).toFixed(1));
if (filesystemPercent >= 90) {
return Color.BG_CHERRY_500;
}
if (filesystemPercent >= 60) {
return Color.BG_AMBER_500;
}
return Color.BG_FOREST_500;
})()}
className="progress-bar"
/>
</div>
),
wait: `${Number((host.wait * 100).toFixed(1))}%`,
load15: host.load15,
}));

View File

@@ -1623,6 +1623,9 @@ export const getHostQueryPayload = (
const diskPendingKey = dotMetricsEnabled
? 'system.disk.pending_operations'
: 'system_disk_pending_operations';
const fsUsageKey = dotMetricsEnabled
? 'system.filesystem.usage'
: 'system_filesystem_usage';
return [
{
@@ -2483,6 +2486,143 @@ export const getHostQueryPayload = (
start,
end,
},
{
selectedTime: 'GLOBAL_TIME',
graphType: PANEL_TYPES.TIME_SERIES,
query: {
builder: {
queryData: [
{
aggregateAttribute: {
dataType: DataTypes.Float64,
id: 'system_filesystem_usage--float64--Gauge--true',
key: fsUsageKey,
type: 'Gauge',
},
aggregateOperator: 'avg',
dataSource: DataSource.METRICS,
disabled: true,
expression: 'A',
filters: {
items: [
{
id: 'fs_f1',
key: {
dataType: DataTypes.String,
id: 'host_name--string--tag--false',
key: hostNameKey,
type: 'tag',
},
op: '=',
value: hostName,
},
{
id: 'fs_f2',
key: {
dataType: DataTypes.String,
id: 'state--string--tag--false',
key: 'state',
type: 'tag',
},
op: '=',
value: 'used',
},
],
op: 'AND',
},
functions: [],
groupBy: [
{
dataType: DataTypes.String,
id: 'mountpoint--string--tag--false',
key: 'mountpoint',
type: 'tag',
},
],
having: [],
legend: '{{mountpoint}}',
limit: null,
orderBy: [],
queryName: 'A',
reduceTo: ReduceOperators.AVG,
spaceAggregation: 'sum',
stepInterval: 60,
timeAggregation: 'avg',
},
{
aggregateAttribute: {
dataType: DataTypes.Float64,
id: 'system_filesystem_usage--float64--Gauge--true',
key: fsUsageKey,
type: 'Gauge',
},
aggregateOperator: 'avg',
dataSource: DataSource.METRICS,
disabled: true,
expression: 'B',
filters: {
items: [
{
id: 'fs_f3',
key: {
dataType: DataTypes.String,
id: 'host_name--string--tag--false',
key: hostNameKey,
type: 'tag',
},
op: '=',
value: hostName,
},
],
op: 'AND',
},
functions: [],
groupBy: [
{
dataType: DataTypes.String,
id: 'mountpoint--string--tag--false',
key: 'mountpoint',
type: 'tag',
},
],
having: [],
legend: '{{mountpoint}}',
limit: null,
orderBy: [],
queryName: 'B',
reduceTo: ReduceOperators.AVG,
spaceAggregation: 'sum',
stepInterval: 60,
timeAggregation: 'avg',
},
],
queryFormulas: [
{
disabled: false,
expression: 'A/B',
legend: '{{mountpoint}}',
queryName: 'F1',
},
],
queryTraceOperator: [],
},
clickhouse_sql: [{ disabled: false, legend: '', name: 'A', query: '' }],
id: 'a1b2c3d4-e5f6-7890-abcd-ef1234567890',
promql: [{ disabled: false, legend: '', name: 'A', query: '' }],
queryType: EQueryType.QUERY_BUILDER,
},
variables: {},
formatForWeb: false,
start,
end,
},
{
selectedTime: 'GLOBAL_TIME',
graphType: PANEL_TYPES.TIME_SERIES,
@@ -2631,6 +2771,6 @@ export const hostWidgetInfo = [
{ title: 'Network connections', yAxisUnit: 'short' },
{ title: 'System disk io (bytes transferred)', yAxisUnit: 'bytes' },
{ title: 'System disk operations/s', yAxisUnit: 'short' },
{ title: 'Disk Usage (%) by mountpoint', yAxisUnit: 'percentunit' },
{ title: 'Queue size', yAxisUnit: 'short' },
{ title: 'Disk operations time', yAxisUnit: 's' },
];

View File

@@ -1,44 +0,0 @@
package middleware
import (
"log/slog"
"net/http"
"runtime/debug"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
)
// Recovery is a middleware that recovers from panics, logs the panic,
// and returns a 500 Internal Server Error.
type Recovery struct {
logger *slog.Logger
}
// NewRecovery creates a new Recovery middleware.
func NewRecovery(logger *slog.Logger) Wrapper {
return &Recovery{
logger: logger.With("pkg", "http-middleware-recovery"),
}
}
// Wrap is the middleware handler.
func (m *Recovery) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
m.logger.ErrorContext(
r.Context(),
"panic recovered",
"err", err, "stack", string(debug.Stack()),
)
render.Error(w, errors.NewInternalf(
errors.CodeInternal, "internal server error",
))
}
}()
next.ServeHTTP(w, r)
})
}

View File

@@ -13,7 +13,6 @@ import (
root "github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
@@ -463,7 +462,7 @@ func (h *handler) UpdateAPIKey(w http.ResponseWriter, r *http.Request) {
return
}
if slices.Contains(integrationstypes.IntegrationUserEmails, createdByUser.Email) {
if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(createdByUser.Email.String())) {
render.Error(w, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "API Keys for integration users cannot be revoked"))
return
}
@@ -508,7 +507,7 @@ func (h *handler) RevokeAPIKey(w http.ResponseWriter, r *http.Request) {
return
}
if slices.Contains(integrationstypes.IntegrationUserEmails, createdByUser.Email) {
if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(createdByUser.Email.String())) {
render.Error(w, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "API Keys for integration users cannot be revoked"))
return
}

View File

@@ -19,7 +19,6 @@ import (
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/emailtypes"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/types/roletypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/dustin/go-humanize"
@@ -172,7 +171,7 @@ func (m *Module) DeleteInvite(ctx context.Context, orgID string, id valuer.UUID)
func (module *Module) CreateUser(ctx context.Context, input *types.User, opts ...root.CreateUserOption) error {
createUserOpts := root.NewCreateUserOptions(opts...)
// since assign is idempotent multiple calls to assign won't cause issues in case of retries.
// since assign is idempotant multiple calls to assign won't cause issues in case of retries.
err := module.authz.Grant(ctx, input.OrgID, roletypes.MustGetSigNozManagedRoleFromExistingRole(input.Role), authtypes.MustNewSubject(authtypes.TypeableUser, input.ID.StringValue(), input.OrgID, nil))
if err != nil {
return err
@@ -287,7 +286,7 @@ func (module *Module) DeleteUser(ctx context.Context, orgID valuer.UUID, id stri
return err
}
if slices.Contains(integrationstypes.IntegrationUserEmails, user.Email) {
if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(user.Email.String())) {
return errors.New(errors.TypeForbidden, errors.CodeForbidden, "integration user cannot be deleted")
}
@@ -301,7 +300,7 @@ func (module *Module) DeleteUser(ctx context.Context, orgID valuer.UUID, id stri
return errors.New(errors.TypeForbidden, errors.CodeForbidden, "cannot delete the last admin")
}
// since revoke is idempotent multiple calls to revoke won't cause issues in case of retries
// since revoke is idempotant multiple calls to revoke won't cause issues in case of retries
err = module.authz.Revoke(ctx, orgID, roletypes.MustGetSigNozManagedRoleFromExistingRole(user.Role), authtypes.MustNewSubject(authtypes.TypeableUser, id, orgID, nil))
if err != nil {
return err

View File

@@ -1,57 +1,55 @@
package store
package cloudintegrations
import (
"context"
"database/sql"
"fmt"
"log/slog"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
CodeCloudIntegrationAccountNotFound errors.Code = errors.MustNewCode("cloud_integration_account_not_found")
)
type cloudProviderAccountsRepository interface {
listConnected(ctx context.Context, orgId string, provider string) ([]types.CloudIntegration, *model.ApiError)
type CloudProviderAccountsRepository interface {
ListConnected(ctx context.Context, orgId string, provider string) ([]integrationstypes.CloudIntegration, error)
get(ctx context.Context, orgId string, provider string, id string) (*types.CloudIntegration, *model.ApiError)
Get(ctx context.Context, orgId string, provider string, id string) (*integrationstypes.CloudIntegration, error)
GetConnectedCloudAccount(ctx context.Context, orgId, provider string, accountID string) (*integrationstypes.CloudIntegration, error)
getConnectedCloudAccount(ctx context.Context, orgId string, provider string, accountID string) (*types.CloudIntegration, *model.ApiError)
// Insert an account or update it by (cloudProvider, id)
// for specified non-empty fields
Upsert(
upsert(
ctx context.Context,
orgId string,
provider string,
id *string,
config []byte,
config *types.AccountConfig,
accountId *string,
agentReport *integrationstypes.AgentReport,
agentReport *types.AgentReport,
removedAt *time.Time,
) (*integrationstypes.CloudIntegration, error)
) (*types.CloudIntegration, *model.ApiError)
}
func NewCloudProviderAccountsRepository(store sqlstore.SQLStore) CloudProviderAccountsRepository {
return &cloudProviderAccountsSQLRepository{store: store}
func newCloudProviderAccountsRepository(store sqlstore.SQLStore) (
*cloudProviderAccountsSQLRepository, error,
) {
return &cloudProviderAccountsSQLRepository{
store: store,
}, nil
}
type cloudProviderAccountsSQLRepository struct {
store sqlstore.SQLStore
}
func (r *cloudProviderAccountsSQLRepository) ListConnected(
func (r *cloudProviderAccountsSQLRepository) listConnected(
ctx context.Context, orgId string, cloudProvider string,
) ([]integrationstypes.CloudIntegration, error) {
accounts := []integrationstypes.CloudIntegration{}
) ([]types.CloudIntegration, *model.ApiError) {
accounts := []types.CloudIntegration{}
err := r.store.BunDB().NewSelect().
Model(&accounts).
@@ -64,17 +62,18 @@ func (r *cloudProviderAccountsSQLRepository) ListConnected(
Scan(ctx)
if err != nil {
slog.ErrorContext(ctx, "error querying connected cloud accounts", "error", err)
return nil, errors.WrapInternalf(err, errors.CodeInternal, "could not query connected cloud accounts")
return nil, model.InternalError(fmt.Errorf(
"could not query connected cloud accounts: %w", err,
))
}
return accounts, nil
}
func (r *cloudProviderAccountsSQLRepository) Get(
func (r *cloudProviderAccountsSQLRepository) get(
ctx context.Context, orgId string, provider string, id string,
) (*integrationstypes.CloudIntegration, error) {
var result integrationstypes.CloudIntegration
) (*types.CloudIntegration, *model.ApiError) {
var result types.CloudIntegration
err := r.store.BunDB().NewSelect().
Model(&result).
@@ -83,25 +82,23 @@ func (r *cloudProviderAccountsSQLRepository) Get(
Where("id = ?", id).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.WrapNotFoundf(
err,
CodeCloudIntegrationAccountNotFound,
"couldn't find account with Id %s", id,
)
}
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud provider account")
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find account with Id %s", id,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud provider accounts: %w", err,
))
}
return &result, nil
}
func (r *cloudProviderAccountsSQLRepository) GetConnectedCloudAccount(
func (r *cloudProviderAccountsSQLRepository) getConnectedCloudAccount(
ctx context.Context, orgId string, provider string, accountId string,
) (*integrationstypes.CloudIntegration, error) {
var result integrationstypes.CloudIntegration
) (*types.CloudIntegration, *model.ApiError) {
var result types.CloudIntegration
err := r.store.BunDB().NewSelect().
Model(&result).
@@ -112,25 +109,29 @@ func (r *cloudProviderAccountsSQLRepository) GetConnectedCloudAccount(
Where("removed_at is NULL").
Scan(ctx)
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.WrapNotFoundf(err, CodeCloudIntegrationAccountNotFound, "couldn't find connected cloud account %s", accountId)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find connected cloud account %s", accountId,
))
} else if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud provider account")
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud provider accounts: %w", err,
))
}
return &result, nil
}
func (r *cloudProviderAccountsSQLRepository) Upsert(
func (r *cloudProviderAccountsSQLRepository) upsert(
ctx context.Context,
orgId string,
provider string,
id *string,
config []byte,
config *types.AccountConfig,
accountId *string,
agentReport *integrationstypes.AgentReport,
agentReport *types.AgentReport,
removedAt *time.Time,
) (*integrationstypes.CloudIntegration, error) {
) (*types.CloudIntegration, *model.ApiError) {
// Insert
if id == nil {
temp := valuer.GenerateUUID().StringValue()
@@ -180,7 +181,7 @@ func (r *cloudProviderAccountsSQLRepository) Upsert(
)
}
integration := integrationstypes.CloudIntegration{
integration := types.CloudIntegration{
OrgID: orgId,
Provider: provider,
Identifiable: types.Identifiable{ID: valuer.MustNewUUID(*id)},
@@ -188,25 +189,28 @@ func (r *cloudProviderAccountsSQLRepository) Upsert(
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
Config: string(config),
Config: config,
AccountID: accountId,
LastAgentReport: agentReport,
RemovedAt: removedAt,
}
_, err := r.store.BunDB().NewInsert().
_, dbErr := r.store.BunDB().NewInsert().
Model(&integration).
On(onConflictClause).
Exec(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't upsert cloud integration account")
if dbErr != nil {
return nil, model.InternalError(fmt.Errorf(
"could not upsert cloud account record: %w", dbErr,
))
}
upsertedAccount, err := r.Get(ctx, orgId, provider, *id)
if err != nil {
slog.ErrorContext(ctx, "error upserting cloud integration account", "error", err)
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't get upserted cloud integration account")
upsertedAccount, apiErr := r.get(ctx, orgId, provider, *id)
if apiErr != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't fetch upserted account by id: %w", apiErr.ToError(),
))
}
return upsertedAccount, nil

View File

@@ -1,4 +1,4 @@
package integrationstypes
package cloudintegrations
import (
"github.com/SigNoz/signoz/pkg/errors"

View File

@@ -0,0 +1,624 @@
package cloudintegrations
import (
"context"
"fmt"
"net/url"
"slices"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"golang.org/x/exp/maps"
)
var SupportedCloudProviders = []string{
"aws",
}
func validateCloudProviderName(name string) *model.ApiError {
if !slices.Contains(SupportedCloudProviders, name) {
return model.BadRequest(fmt.Errorf("invalid cloud provider: %s", name))
}
return nil
}
type Controller struct {
accountsRepo cloudProviderAccountsRepository
serviceConfigRepo ServiceConfigDatabase
}
func NewController(sqlStore sqlstore.SQLStore) (*Controller, error) {
accountsRepo, err := newCloudProviderAccountsRepository(sqlStore)
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider accounts repo: %w", err)
}
serviceConfigRepo, err := newServiceConfigRepository(sqlStore)
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider service config repo: %w", err)
}
return &Controller{
accountsRepo: accountsRepo,
serviceConfigRepo: serviceConfigRepo,
}, nil
}
type ConnectedAccountsListResponse struct {
Accounts []types.Account `json:"accounts"`
}
func (c *Controller) ListConnectedAccounts(ctx context.Context, orgId string, cloudProvider string) (
*ConnectedAccountsListResponse, *model.ApiError,
) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
accountRecords, apiErr := c.accountsRepo.listConnected(ctx, orgId, cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud accounts")
}
connectedAccounts := []types.Account{}
for _, a := range accountRecords {
connectedAccounts = append(connectedAccounts, a.Account())
}
return &ConnectedAccountsListResponse{
Accounts: connectedAccounts,
}, nil
}
type GenerateConnectionUrlRequest struct {
// Optional. To be specified for updates.
AccountId *string `json:"account_id,omitempty"`
AccountConfig types.AccountConfig `json:"account_config"`
AgentConfig SigNozAgentConfig `json:"agent_config"`
}
type SigNozAgentConfig struct {
// The region in which SigNoz agent should be installed.
Region string `json:"region"`
IngestionUrl string `json:"ingestion_url"`
IngestionKey string `json:"ingestion_key"`
SigNozAPIUrl string `json:"signoz_api_url"`
SigNozAPIKey string `json:"signoz_api_key"`
Version string `json:"version,omitempty"`
}
type GenerateConnectionUrlResponse struct {
AccountId string `json:"account_id"`
ConnectionUrl string `json:"connection_url"`
}
func (c *Controller) GenerateConnectionUrl(ctx context.Context, orgId string, cloudProvider string, req GenerateConnectionUrlRequest) (*GenerateConnectionUrlResponse, *model.ApiError) {
// Account connection with a simple connection URL may not be available for all providers.
if cloudProvider != "aws" {
return nil, model.BadRequest(fmt.Errorf("unsupported cloud provider: %s", cloudProvider))
}
account, apiErr := c.accountsRepo.upsert(
ctx, orgId, cloudProvider, req.AccountId, &req.AccountConfig, nil, nil, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
agentVersion := "v0.0.8"
if req.AgentConfig.Version != "" {
agentVersion = req.AgentConfig.Version
}
connectionUrl := fmt.Sprintf(
"https://%s.console.aws.amazon.com/cloudformation/home?region=%s#/stacks/quickcreate?",
req.AgentConfig.Region, req.AgentConfig.Region,
)
for qp, value := range map[string]string{
"param_SigNozIntegrationAgentVersion": agentVersion,
"param_SigNozApiUrl": req.AgentConfig.SigNozAPIUrl,
"param_SigNozApiKey": req.AgentConfig.SigNozAPIKey,
"param_SigNozAccountId": account.ID.StringValue(),
"param_IngestionUrl": req.AgentConfig.IngestionUrl,
"param_IngestionKey": req.AgentConfig.IngestionKey,
"stackName": "signoz-integration",
"templateURL": fmt.Sprintf(
"https://signoz-integrations.s3.us-east-1.amazonaws.com/aws-quickcreate-template-%s.json",
agentVersion,
),
} {
connectionUrl += fmt.Sprintf("&%s=%s", qp, url.QueryEscape(value))
}
return &GenerateConnectionUrlResponse{
AccountId: account.ID.StringValue(),
ConnectionUrl: connectionUrl,
}, nil
}
type AccountStatusResponse struct {
Id string `json:"id"`
CloudAccountId *string `json:"cloud_account_id,omitempty"`
Status types.AccountStatus `json:"status"`
}
func (c *Controller) GetAccountStatus(ctx context.Context, orgId string, cloudProvider string, accountId string) (
*AccountStatusResponse, *model.ApiError,
) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
account, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, accountId)
if apiErr != nil {
return nil, apiErr
}
resp := AccountStatusResponse{
Id: account.ID.StringValue(),
CloudAccountId: account.AccountID,
Status: account.Status(),
}
return &resp, nil
}
type AgentCheckInRequest struct {
ID string `json:"account_id"`
AccountID string `json:"cloud_account_id"`
// Arbitrary cloud specific Agent data
Data map[string]any `json:"data,omitempty"`
}
type AgentCheckInResponse struct {
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`
IntegrationConfig IntegrationConfigForAgent `json:"integration_config"`
}
type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"`
TelemetryCollectionStrategy *CompiledCollectionStrategy `json:"telemetry,omitempty"`
}
func (c *Controller) CheckInAsAgent(ctx context.Context, orgId string, cloudProvider string, req AgentCheckInRequest) (*AgentCheckInResponse, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
existingAccount, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, req.ID)
if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID {
return nil, model.BadRequest(fmt.Errorf(
"can't check in with new %s account id %s for account %s with existing %s id %s",
cloudProvider, req.AccountID, existingAccount.ID.StringValue(), cloudProvider, *existingAccount.AccountID,
))
}
existingAccount, apiErr = c.accountsRepo.getConnectedCloudAccount(ctx, orgId, cloudProvider, req.AccountID)
if existingAccount != nil && existingAccount.ID.StringValue() != req.ID {
return nil, model.BadRequest(fmt.Errorf(
"can't check in to %s account %s with id %s. already connected with id %s",
cloudProvider, req.AccountID, req.ID, existingAccount.ID.StringValue(),
))
}
agentReport := types.AgentReport{
TimestampMillis: time.Now().UnixMilli(),
Data: req.Data,
}
account, apiErr := c.accountsRepo.upsert(
ctx, orgId, cloudProvider, &req.ID, nil, &req.AccountID, &agentReport, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
// prepare and return integration config to be consumed by agent
compiledStrategy, err := NewCompiledCollectionStrategy(cloudProvider)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't init telemetry collection strategy: %w", err,
))
}
agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{},
TelemetryCollectionStrategy: compiledStrategy,
}
if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions
}
services, err := services.Map(cloudProvider)
if err != nil {
return nil, err
}
svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, orgId, account.ID.StringValue(),
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}
// accumulate config in a fixed order to ensure same config generated across runs
configuredServices := maps.Keys(svcConfigs)
slices.Sort(configuredServices)
for _, svcType := range configuredServices {
definition, ok := services[svcType]
if !ok {
continue
}
config := svcConfigs[svcType]
err := AddServiceStrategy(svcType, compiledStrategy, definition.Strategy, config)
if err != nil {
return nil, err
}
}
return &AgentCheckInResponse{
AccountId: account.ID.StringValue(),
CloudAccountId: *account.AccountID,
RemovedAt: account.RemovedAt,
IntegrationConfig: agentConfig,
}, nil
}
type UpdateAccountConfigRequest struct {
Config types.AccountConfig `json:"config"`
}
func (c *Controller) UpdateAccountConfig(ctx context.Context, orgId string, cloudProvider string, accountId string, req UpdateAccountConfigRequest) (*types.Account, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
accountRecord, apiErr := c.accountsRepo.upsert(
ctx, orgId, cloudProvider, &accountId, &req.Config, nil, nil, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
account := accountRecord.Account()
return &account, nil
}
func (c *Controller) DisconnectAccount(ctx context.Context, orgId string, cloudProvider string, accountId string) (*types.CloudIntegration, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
account, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, accountId)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't disconnect account")
}
tsNow := time.Now()
account, apiErr = c.accountsRepo.upsert(
ctx, orgId, cloudProvider, &accountId, nil, nil, nil, &tsNow,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't disconnect account")
}
return account, nil
}
type ListServicesResponse struct {
Services []ServiceSummary `json:"services"`
}
func (c *Controller) ListServices(
ctx context.Context,
orgID string,
cloudProvider string,
cloudAccountId *string,
) (*ListServicesResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
definitions, apiErr := services.List(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcConfigs := map[string]*types.CloudServiceConfig{}
if cloudAccountId != nil {
activeAccount, apiErr := c.accountsRepo.getConnectedCloudAccount(
ctx, orgID, cloudProvider, *cloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't get active account")
}
svcConfigs, apiErr = c.serviceConfigRepo.getAllForAccount(
ctx, orgID, activeAccount.ID.StringValue(),
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}
}
summaries := []ServiceSummary{}
for _, def := range definitions {
summary := ServiceSummary{
Metadata: def.Metadata,
}
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
}
return &ListServicesResponse{
Services: summaries,
}, nil
}
func (c *Controller) GetServiceDetails(
ctx context.Context,
orgID string,
cloudProvider string,
serviceId string,
cloudAccountId *string,
) (*ServiceDetails, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
definition, err := services.GetServiceDefinition(cloudProvider, serviceId)
if err != nil {
return nil, err
}
details := ServiceDetails{
Definition: *definition,
}
if cloudAccountId != nil {
activeAccount, apiErr := c.accountsRepo.getConnectedCloudAccount(
ctx, orgID, cloudProvider, *cloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't get active account")
}
config, apiErr := c.serviceConfigRepo.get(
ctx, orgID, activeAccount.ID.StringValue(), serviceId,
)
if apiErr != nil && apiErr.Type() != model.ErrorNotFound {
return nil, model.WrapApiError(apiErr, "couldn't fetch service config")
}
if config != nil {
details.Config = config
enabled := false
if config.Metrics != nil && config.Metrics.Enabled {
enabled = true
}
// add links to service dashboards, making them clickable.
for i, d := range definition.Assets.Dashboards {
dashboardUuid := c.dashboardUuid(
cloudProvider, serviceId, d.Id,
)
if enabled {
definition.Assets.Dashboards[i].Url = fmt.Sprintf("/dashboard/%s", dashboardUuid)
} else {
definition.Assets.Dashboards[i].Url = "" // to unset the in-memory URL if enabled once and disabled afterwards
}
}
}
}
return &details, nil
}
type UpdateServiceConfigRequest struct {
CloudAccountId string `json:"cloud_account_id"`
Config types.CloudServiceConfig `json:"config"`
}
func (u *UpdateServiceConfigRequest) Validate(def *services.Definition) error {
if def.Id != services.S3Sync && u.Config.Logs != nil && u.Config.Logs.S3Buckets != nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "s3 buckets can only be added to service-type[%s]", services.S3Sync)
} else if def.Id == services.S3Sync && u.Config.Logs != nil && u.Config.Logs.S3Buckets != nil {
for region := range u.Config.Logs.S3Buckets {
if _, found := ValidAWSRegions[region]; !found {
return errors.NewInvalidInputf(CodeInvalidCloudRegion, "invalid cloud region: %s", region)
}
}
}
return nil
}
type UpdateServiceConfigResponse struct {
Id string `json:"id"`
Config types.CloudServiceConfig `json:"config"`
}
func (c *Controller) UpdateServiceConfig(
ctx context.Context,
orgID string,
cloudProvider string,
serviceType string,
req *UpdateServiceConfigRequest,
) (*UpdateServiceConfigResponse, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
// can only update config for a valid service.
definition, err := services.GetServiceDefinition(cloudProvider, serviceType)
if err != nil {
return nil, err
}
if err := req.Validate(definition); err != nil {
return nil, err
}
// can only update config for a connected cloud account id
_, apiErr := c.accountsRepo.getConnectedCloudAccount(
ctx, orgID, cloudProvider, req.CloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't find connected cloud account")
}
updatedConfig, apiErr := c.serviceConfigRepo.upsert(
ctx, orgID, cloudProvider, req.CloudAccountId, serviceType, req.Config,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't update service config")
}
return &UpdateServiceConfigResponse{
Id: serviceType,
Config: *updatedConfig,
}, nil
}
// All dashboards that are available based on cloud integrations configuration
// across all cloud providers
func (c *Controller) AvailableDashboards(ctx context.Context, orgId valuer.UUID) ([]*dashboardtypes.Dashboard, *model.ApiError) {
allDashboards := []*dashboardtypes.Dashboard{}
for _, provider := range []string{"aws"} {
providerDashboards, apiErr := c.AvailableDashboardsForCloudProvider(ctx, orgId, provider)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, fmt.Sprintf("couldn't get available dashboards for %s", provider),
)
}
allDashboards = append(allDashboards, providerDashboards...)
}
return allDashboards, nil
}
func (c *Controller) AvailableDashboardsForCloudProvider(ctx context.Context, orgID valuer.UUID, cloudProvider string) ([]*dashboardtypes.Dashboard, *model.ApiError) {
accountRecords, apiErr := c.accountsRepo.listConnected(ctx, orgID.StringValue(), cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list connected cloud accounts")
}
// for v0, service dashboards are only available when metrics are enabled.
servicesWithAvailableMetrics := map[string]*time.Time{}
for _, ar := range accountRecords {
if ar.AccountID != nil {
configsBySvcId, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, orgID.StringValue(), ar.ID.StringValue(),
)
if apiErr != nil {
return nil, apiErr
}
for svcId, config := range configsBySvcId {
if config.Metrics != nil && config.Metrics.Enabled {
servicesWithAvailableMetrics[svcId] = &ar.CreatedAt
}
}
}
}
allServices, apiErr := services.List(cloudProvider)
if apiErr != nil {
return nil, apiErr
}
svcDashboards := []*dashboardtypes.Dashboard{}
for _, svc := range allServices {
serviceDashboardsCreatedAt := servicesWithAvailableMetrics[svc.Id]
if serviceDashboardsCreatedAt != nil {
for _, d := range svc.Assets.Dashboards {
author := fmt.Sprintf("%s-integration", cloudProvider)
svcDashboards = append(svcDashboards, &dashboardtypes.Dashboard{
ID: c.dashboardUuid(cloudProvider, svc.Id, d.Id),
Locked: true,
Data: *d.Definition,
TimeAuditable: types.TimeAuditable{
CreatedAt: *serviceDashboardsCreatedAt,
UpdatedAt: *serviceDashboardsCreatedAt,
},
UserAuditable: types.UserAuditable{
CreatedBy: author,
UpdatedBy: author,
},
OrgID: orgID,
})
}
servicesWithAvailableMetrics[svc.Id] = nil
}
}
return svcDashboards, nil
}
func (c *Controller) GetDashboardById(ctx context.Context, orgId valuer.UUID, dashboardUuid string) (*dashboardtypes.Dashboard, *model.ApiError) {
cloudProvider, _, _, apiErr := c.parseDashboardUuid(dashboardUuid)
if apiErr != nil {
return nil, apiErr
}
allDashboards, apiErr := c.AvailableDashboardsForCloudProvider(ctx, orgId, cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list available dashboards")
}
for _, d := range allDashboards {
if d.ID == dashboardUuid {
return d, nil
}
}
return nil, model.NotFoundError(fmt.Errorf("couldn't find dashboard with uuid: %s", dashboardUuid))
}
func (c *Controller) dashboardUuid(
cloudProvider string, svcId string, dashboardId string,
) string {
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcId, dashboardId)
}
func (c *Controller) parseDashboardUuid(dashboardUuid string) (cloudProvider string, svcId string, dashboardId string, apiErr *model.ApiError) {
parts := strings.SplitN(dashboardUuid, "--", 4)
if len(parts) != 4 || parts[0] != "cloud-integration" {
return "", "", "", model.BadRequest(fmt.Errorf("invalid cloud integration dashboard id"))
}
return parts[1], parts[2], parts[3], nil
}
func (c *Controller) IsCloudIntegrationDashboardUuid(dashboardUuid string) bool {
_, _, _, apiErr := c.parseDashboardUuid(dashboardUuid)
return apiErr == nil
}

View File

@@ -1,757 +0,0 @@
package implawsprovider
import (
"context"
"fmt"
"log/slog"
"net/url"
"slices"
"sort"
"strings"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
integrationstore "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
"golang.org/x/exp/maps"
)
var (
CodeInvalidAWSRegion = errors.MustNewCode("invalid_aws_region")
CodeDashboardNotFound = errors.MustNewCode("dashboard_not_found")
)
type awsProvider struct {
logger *slog.Logger
querier interfaces.Querier
accountsRepo integrationstore.CloudProviderAccountsRepository
serviceConfigRepo integrationstore.ServiceConfigDatabase
awsServiceDefinitions *services.AWSServicesProvider
reader interfaces.Reader
}
func NewAWSCloudProvider(
logger *slog.Logger,
accountsRepo integrationstore.CloudProviderAccountsRepository,
serviceConfigRepo integrationstore.ServiceConfigDatabase,
reader interfaces.Reader,
querier interfaces.Querier,
) integrationstypes.CloudProvider {
awsServiceDefinitions, err := services.NewAWSCloudProviderServices()
if err != nil {
panic("failed to initialize AWS service definitions: " + err.Error())
}
return &awsProvider{
logger: logger,
reader: reader,
querier: querier,
accountsRepo: accountsRepo,
serviceConfigRepo: serviceConfigRepo,
awsServiceDefinitions: awsServiceDefinitions,
}
}
func (a *awsProvider) GetAccountStatus(ctx context.Context, orgID, accountID string) (*integrationstypes.GettableAccountStatus, error) {
accountRecord, err := a.accountsRepo.Get(ctx, orgID, a.GetName().String(), accountID)
if err != nil {
return nil, err
}
return &integrationstypes.GettableAccountStatus{
Id: accountRecord.ID.String(),
CloudAccountId: accountRecord.AccountID,
Status: accountRecord.Status(),
}, nil
}
func (a *awsProvider) ListConnectedAccounts(ctx context.Context, orgID string) (*integrationstypes.GettableConnectedAccountsList, error) {
accountRecords, err := a.accountsRepo.ListConnected(ctx, orgID, a.GetName().String())
if err != nil {
return nil, err
}
connectedAccounts := make([]*integrationstypes.Account, 0, len(accountRecords))
for _, r := range accountRecords {
connectedAccounts = append(connectedAccounts, r.Account(a.GetName()))
}
return &integrationstypes.GettableConnectedAccountsList{
Accounts: connectedAccounts,
}, nil
}
func (a *awsProvider) AgentCheckIn(ctx context.Context, req *integrationstypes.PostableAgentCheckInPayload) (any, error) {
// agent can't check in unless the account is already created
existingAccount, err := a.accountsRepo.Get(ctx, req.OrgID, a.GetName().String(), req.ID)
if err != nil {
return nil, err
}
if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "can't check in with new %s account id %s for account %s with existing %s id %s",
a.GetName().String(), req.AccountID, existingAccount.ID.StringValue(), a.GetName().String(),
*existingAccount.AccountID)
}
existingAccount, err = a.accountsRepo.GetConnectedCloudAccount(ctx, req.OrgID, a.GetName().String(), req.AccountID)
if existingAccount != nil && existingAccount.ID.StringValue() != req.ID {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
"can't check in to %s account %s with id %s. already connected with id %s",
a.GetName().String(), req.AccountID, req.ID, existingAccount.ID.StringValue())
}
agentReport := integrationstypes.AgentReport{
TimestampMillis: time.Now().UnixMilli(),
Data: req.Data,
}
account, err := a.accountsRepo.Upsert(
ctx, req.OrgID, a.GetName().String(), &req.ID, nil, &req.AccountID, &agentReport, nil,
)
if err != nil {
return nil, err
}
agentConfig, err := a.getAWSAgentConfig(ctx, account)
if err != nil {
return nil, err
}
return &integrationstypes.GettableAWSAgentCheckIn{
AccountId: account.ID.StringValue(),
CloudAccountId: *account.AccountID,
RemovedAt: account.RemovedAt,
IntegrationConfig: *agentConfig,
}, nil
}
func (a *awsProvider) getAWSAgentConfig(ctx context.Context, account *integrationstypes.CloudIntegration) (*integrationstypes.AWSAgentIntegrationConfig, error) {
// prepare and return integration config to be consumed by agent
agentConfig := &integrationstypes.AWSAgentIntegrationConfig{
EnabledRegions: []string{},
TelemetryCollectionStrategy: &integrationstypes.AWSCollectionStrategy{
Provider: a.GetName(),
AWSMetrics: &integrationstypes.AWSMetricsStrategy{},
AWSLogs: &integrationstypes.AWSLogsStrategy{},
S3Buckets: map[string][]string{},
},
}
accountConfig := new(integrationstypes.AWSAccountConfig)
err := accountConfig.Unmarshal([]byte(account.Config))
if err != nil {
return nil, err
}
if accountConfig != nil && accountConfig.EnabledRegions != nil {
agentConfig.EnabledRegions = accountConfig.EnabledRegions
}
svcConfigs, err := a.serviceConfigRepo.GetAllForAccount(
ctx, account.OrgID, account.ID.StringValue(),
)
if err != nil {
return nil, err
}
// accumulate config in a fixed order to ensure same config generated across runs
configuredServices := maps.Keys(svcConfigs)
slices.Sort(configuredServices)
for _, svcType := range configuredServices {
definition, err := a.awsServiceDefinitions.GetServiceDefinition(ctx, svcType)
if err != nil {
continue
}
config := svcConfigs[svcType]
serviceConfig := new(integrationstypes.AWSCloudServiceConfig)
err = serviceConfig.Unmarshal(config)
if err != nil {
continue
}
if serviceConfig.Logs != nil && serviceConfig.Logs.Enabled {
if svcType == integrationstypes.S3Sync {
// S3 bucket sync; No cloudwatch logs are appended for this service type;
// Though definition is populated with a custom cloudwatch group that helps in calculating logs connection status
agentConfig.TelemetryCollectionStrategy.S3Buckets = serviceConfig.Logs.S3Buckets
} else if definition.Strategy.AWSLogs != nil { // services that includes a logs subscription
agentConfig.TelemetryCollectionStrategy.AWSLogs.Subscriptions = append(
agentConfig.TelemetryCollectionStrategy.AWSLogs.Subscriptions,
definition.Strategy.AWSLogs.Subscriptions...,
)
}
}
if serviceConfig.Metrics != nil && serviceConfig.Metrics.Enabled && definition.Strategy.AWSMetrics != nil {
agentConfig.TelemetryCollectionStrategy.AWSMetrics.StreamFilters = append(
agentConfig.TelemetryCollectionStrategy.AWSMetrics.StreamFilters,
definition.Strategy.AWSMetrics.StreamFilters...,
)
}
}
return agentConfig, nil
}
func (a *awsProvider) GetName() integrationstypes.CloudProviderType {
return integrationstypes.CloudProviderAWS
}
func (a *awsProvider) ListServices(ctx context.Context, orgID string, cloudAccountID *string) (any, error) {
svcConfigs := make(map[string]*integrationstypes.AWSCloudServiceConfig)
if cloudAccountID != nil {
activeAccount, err := a.accountsRepo.GetConnectedCloudAccount(ctx, orgID, a.GetName().String(), *cloudAccountID)
if err != nil {
return nil, err
}
serviceConfigs, err := a.serviceConfigRepo.GetAllForAccount(ctx, orgID, activeAccount.ID.String())
if err != nil {
return nil, err
}
for svcType, config := range serviceConfigs {
serviceConfig := new(integrationstypes.AWSCloudServiceConfig)
err = serviceConfig.Unmarshal(config)
if err != nil {
return nil, err
}
svcConfigs[svcType] = serviceConfig
}
}
summaries := make([]integrationstypes.AWSServiceSummary, 0)
definitions, err := a.awsServiceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, err
}
for _, def := range definitions {
summary := integrationstypes.AWSServiceSummary{
DefinitionMetadata: def.DefinitionMetadata,
Config: nil,
}
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
}
sort.Slice(summaries, func(i, j int) bool {
return summaries[i].DefinitionMetadata.Title < summaries[j].DefinitionMetadata.Title
})
return &integrationstypes.GettableAWSServices{
Services: summaries,
}, nil
}
func (a *awsProvider) GetServiceDetails(ctx context.Context, req *integrationstypes.GetServiceDetailsReq) (any, error) {
details := new(integrationstypes.GettableAWSServiceDetails)
awsDefinition, err := a.awsServiceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
if err != nil {
return nil, err
}
details.AWSServiceDefinition = *awsDefinition
details.Strategy.Provider = a.GetName()
if req.CloudAccountID == nil {
return details, nil
}
config, err := a.getServiceConfig(ctx, &details.AWSServiceDefinition, req.OrgID, a.GetName().String(), req.ServiceId, *req.CloudAccountID)
if err != nil {
return nil, err
}
if config == nil {
return details, nil
}
details.Config = config
connectionStatus, err := a.calculateCloudIntegrationServiceConnectionStatus(
ctx,
req.OrgID,
*req.CloudAccountID,
&details.AWSServiceDefinition,
)
if err != nil {
return nil, err
}
details.ConnectionStatus = connectionStatus
return details, nil
}
func (a *awsProvider) calculateCloudIntegrationServiceConnectionStatus(
ctx context.Context,
orgID valuer.UUID,
cloudAccountId string,
svcDetails *integrationstypes.AWSServiceDefinition,
) (*integrationstypes.ServiceConnectionStatus, error) {
telemetryCollectionStrategy := svcDetails.Strategy
result := &integrationstypes.ServiceConnectionStatus{}
if telemetryCollectionStrategy == nil {
return result, model.InternalError(fmt.Errorf(
"service doesn't have telemetry collection strategy: %s", svcDetails.Id,
))
}
errors := make([]error, 0)
var resultLock sync.Mutex
var wg sync.WaitGroup
// Calculate metrics connection status
if telemetryCollectionStrategy.AWSMetrics != nil {
wg.Add(1)
go func() {
defer wg.Done()
metricsConnStatus, apiErr := a.calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx, cloudAccountId, telemetryCollectionStrategy.AWSMetrics, svcDetails.DataCollected.Metrics,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Metrics = metricsConnStatus
}
}()
}
// Calculate logs connection status
if telemetryCollectionStrategy.AWSLogs != nil {
wg.Add(1)
go func() {
defer wg.Done()
logsConnStatus, apiErr := a.calculateAWSIntegrationSvcLogsConnectionStatus(
ctx, orgID, cloudAccountId, telemetryCollectionStrategy.AWSLogs,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Logs = logsConnStatus
}
}()
}
wg.Wait()
if len(errors) > 0 {
return result, errors[0]
}
return result, nil
}
func (a *awsProvider) calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *integrationstypes.AWSMetricsStrategy,
metricsCollectedBySvc []integrationstypes.CollectedMetric,
) (*integrationstypes.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.StreamFilters) < 1 {
return nil, nil
}
expectedLabelValues := map[string]string{
"cloud_provider": "aws",
"cloud_account_id": cloudAccountId,
}
metricsNamespace := strategy.StreamFilters[0].Namespace
metricsNamespaceParts := strings.Split(metricsNamespace, "/")
if len(metricsNamespaceParts) >= 2 {
expectedLabelValues["service_namespace"] = metricsNamespaceParts[0]
expectedLabelValues["service_name"] = metricsNamespaceParts[1]
} else {
// metrics for single word namespaces like "CWAgent" do not
// have the service_namespace label populated
expectedLabelValues["service_name"] = metricsNamespaceParts[0]
}
metricNamesCollectedBySvc := []string{}
for _, cm := range metricsCollectedBySvc {
metricNamesCollectedBySvc = append(metricNamesCollectedBySvc, cm.Name)
}
statusForLastReceivedMetric, apiErr := a.reader.GetLatestReceivedMetric(
ctx, metricNamesCollectedBySvc, expectedLabelValues,
)
if apiErr != nil {
return nil, apiErr
}
if statusForLastReceivedMetric != nil {
return &integrationstypes.SignalConnectionStatus{
LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis,
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
}
func (a *awsProvider) calculateAWSIntegrationSvcLogsConnectionStatus(
ctx context.Context,
orgID valuer.UUID,
cloudAccountId string,
strategy *integrationstypes.AWSLogsStrategy,
) (*integrationstypes.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.Subscriptions) < 1 {
return nil, nil
}
logGroupNamePrefix := strategy.Subscriptions[0].LogGroupNamePrefix
if len(logGroupNamePrefix) < 1 {
return nil, nil
}
logsConnTestFilter := &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "cloud.account.id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "=",
Value: cloudAccountId,
},
{
Key: v3.AttributeKey{
Key: "aws.cloudwatch.log_group_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "like",
Value: logGroupNamePrefix + "%",
},
},
}
// TODO(Raj): Receive this as a param from UI in the future.
lookbackSeconds := int64(30 * 60)
qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: logsConnTestFilter,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, _, err := a.querier.QueryRange(
ctx, orgID, qrParams,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]
return &integrationstypes.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
}
func (a *awsProvider) getServiceConfig(ctx context.Context,
def *integrationstypes.AWSServiceDefinition, orgID valuer.UUID, cloudProvider, serviceId, cloudAccountId string,
) (*integrationstypes.AWSCloudServiceConfig, error) {
activeAccount, err := a.accountsRepo.GetConnectedCloudAccount(ctx, orgID.String(), cloudProvider, cloudAccountId)
if err != nil {
return nil, err
}
config, err := a.serviceConfigRepo.Get(ctx, orgID.String(), activeAccount.ID.StringValue(), serviceId)
if err != nil {
if errors.Ast(err, errors.TypeNotFound) {
return nil, nil
}
return nil, err
}
serviceConfig := new(integrationstypes.AWSCloudServiceConfig)
err = serviceConfig.Unmarshal(config)
if err != nil {
return nil, err
}
if config != nil && serviceConfig.Metrics != nil && serviceConfig.Metrics.Enabled {
def.PopulateDashboardURLs(serviceId)
}
return serviceConfig, nil
}
func (a *awsProvider) GetAvailableDashboards(ctx context.Context, orgID valuer.UUID) ([]*dashboardtypes.Dashboard, error) {
accountRecords, err := a.accountsRepo.ListConnected(ctx, orgID.StringValue(), a.GetName().String())
if err != nil {
return nil, err
}
// for now service dashboards are only available when metrics are enabled.
servicesWithAvailableMetrics := map[string]*time.Time{}
for _, ar := range accountRecords {
if ar.AccountID != nil {
configsBySvcId, err := a.serviceConfigRepo.GetAllForAccount(ctx, orgID.StringValue(), ar.ID.StringValue())
if err != nil {
return nil, err
}
for svcId, config := range configsBySvcId {
serviceConfig := new(integrationstypes.AWSCloudServiceConfig)
err = serviceConfig.Unmarshal(config)
if err != nil {
return nil, err
}
if serviceConfig.Metrics != nil && serviceConfig.Metrics.Enabled {
servicesWithAvailableMetrics[svcId] = &ar.CreatedAt
}
}
}
}
svcDashboards := make([]*dashboardtypes.Dashboard, 0)
allServices, err := a.awsServiceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to list aws service definitions")
}
for _, svc := range allServices {
serviceDashboardsCreatedAt, ok := servicesWithAvailableMetrics[svc.Id]
if ok {
svcDashboards = integrationstypes.GetDashboardsFromAssets(svc.Id, a.GetName(), serviceDashboardsCreatedAt, svc.Assets)
servicesWithAvailableMetrics[svc.Id] = nil
}
}
return svcDashboards, nil
}
func (a *awsProvider) GetDashboard(ctx context.Context, req *integrationstypes.GettableDashboard) (*dashboardtypes.Dashboard, error) {
allDashboards, err := a.GetAvailableDashboards(ctx, req.OrgID)
if err != nil {
return nil, err
}
for _, d := range allDashboards {
if d.ID == req.ID {
return d, nil
}
}
return nil, errors.NewNotFoundf(CodeDashboardNotFound, "dashboard with id %s not found", req.ID)
}
func (a *awsProvider) GenerateConnectionArtifact(ctx context.Context, req *integrationstypes.PostableConnectionArtifact) (any, error) {
connection := new(integrationstypes.PostableAWSConnectionUrl)
err := connection.Unmarshal(req.Data)
if err != nil {
return nil, err
}
if connection.AccountConfig != nil {
for _, region := range connection.AccountConfig.EnabledRegions {
if integrationstypes.ValidAWSRegions[region] {
continue
}
return nil, errors.NewInvalidInputf(CodeInvalidAWSRegion, "invalid aws region: %s", region)
}
}
config, err := connection.AccountConfig.Marshal()
if err != nil {
return nil, err
}
account, err := a.accountsRepo.Upsert(
ctx, req.OrgID, integrationstypes.CloudProviderAWS.String(), nil, config,
nil, nil, nil,
)
if err != nil {
return nil, err
}
agentVersion := "v0.0.8"
if connection.AgentConfig.Version != "" {
agentVersion = connection.AgentConfig.Version
}
baseURL := fmt.Sprintf("https://%s.console.aws.amazon.com/cloudformation/home",
connection.AgentConfig.Region)
u, _ := url.Parse(baseURL)
q := u.Query()
q.Set("region", connection.AgentConfig.Region)
u.Fragment = "/stacks/quickcreate"
u.RawQuery = q.Encode()
q = u.Query()
q.Set("stackName", "signoz-integration")
q.Set("templateURL", fmt.Sprintf("https://signoz-integrations.s3.us-east-1.amazonaws.com/aws-quickcreate-template-%s.json", agentVersion))
q.Set("param_SigNozIntegrationAgentVersion", agentVersion)
q.Set("param_SigNozApiUrl", connection.AgentConfig.SigNozAPIUrl)
q.Set("param_SigNozApiKey", connection.AgentConfig.SigNozAPIKey)
q.Set("param_SigNozAccountId", account.ID.StringValue())
q.Set("param_IngestionUrl", connection.AgentConfig.IngestionUrl)
q.Set("param_IngestionKey", connection.AgentConfig.IngestionKey)
return &integrationstypes.GettableAWSConnectionUrl{
AccountId: account.ID.StringValue(),
ConnectionUrl: u.String() + "?&" + q.Encode(), // this format is required by AWS
}, nil
}
func (a *awsProvider) UpdateServiceConfig(ctx context.Context, req *integrationstypes.PatchableServiceConfig) (any, error) {
definition, err := a.awsServiceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
if err != nil {
return nil, err
}
serviceConfig := new(integrationstypes.PatchableAWSCloudServiceConfig)
err = serviceConfig.Unmarshal(req.Config)
if err != nil {
return nil, err
}
if err = serviceConfig.Config.Validate(definition); err != nil {
return nil, err
}
// can only update config for a connected cloud account id
_, err = a.accountsRepo.GetConnectedCloudAccount(
ctx, req.OrgID, a.GetName().String(), serviceConfig.CloudAccountId,
)
if err != nil {
return nil, err
}
serviceConfigBytes, err := serviceConfig.Config.Marshal()
if err != nil {
return nil, err
}
updatedConfig, err := a.serviceConfigRepo.Upsert(
ctx, req.OrgID, a.GetName().String(), serviceConfig.CloudAccountId, req.ServiceId, serviceConfigBytes,
)
if err != nil {
return nil, err
}
if err = serviceConfig.Config.Unmarshal(updatedConfig); err != nil {
return nil, err
}
return &integrationstypes.PatchServiceConfigResponse{
ServiceId: req.ServiceId,
Config: serviceConfig.Config,
}, nil
}
func (a *awsProvider) UpdateAccountConfig(ctx context.Context, req *integrationstypes.PatchableAccountConfig) (any, error) {
config := new(integrationstypes.PatchableAWSAccountConfig)
err := config.Unmarshal(req.Data)
if err != nil {
return nil, err
}
if config.Config == nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "account config can't be null")
}
for _, region := range config.Config.EnabledRegions {
if integrationstypes.ValidAWSRegions[region] {
continue
}
return nil, errors.NewInvalidInputf(CodeInvalidAWSRegion, "invalid aws region: %s", region)
}
configBytes, err := config.Config.Marshal()
if err != nil {
return nil, err
}
// account must exist to update config, but it doesn't need to be connected
_, err = a.accountsRepo.Get(ctx, req.OrgID, a.GetName().String(), req.AccountId)
if err != nil {
return nil, err
}
accountRecord, err := a.accountsRepo.Upsert(
ctx, req.OrgID, a.GetName().String(), &req.AccountId, configBytes, nil, nil, nil,
)
if err != nil {
return nil, err
}
return accountRecord.Account(a.GetName()), nil
}
func (a *awsProvider) DisconnectAccount(ctx context.Context, orgID, accountID string) (*integrationstypes.CloudIntegration, error) {
account, err := a.accountsRepo.Get(ctx, orgID, a.GetName().String(), accountID)
if err != nil {
return nil, err
}
tsNow := time.Now()
account, err = a.accountsRepo.Upsert(
ctx, orgID, a.GetName().String(), &accountID, nil, nil, nil, &tsNow,
)
if err != nil {
return nil, err
}
return account, nil
}

View File

@@ -1 +1,94 @@
package cloudintegrations
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/types"
)
type ServiceSummary struct {
services.Metadata
Config *types.CloudServiceConfig `json:"config"`
}
type ServiceDetails struct {
services.Definition
Config *types.CloudServiceConfig `json:"config"`
ConnectionStatus *ServiceConnectionStatus `json:"status,omitempty"`
}
type AccountStatus struct {
Integration AccountIntegrationStatus `json:"integration"`
}
type AccountIntegrationStatus struct {
LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"`
}
type LogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type MetricsConfig struct {
Enabled bool `json:"enabled"`
}
type ServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
}
type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}
type CompiledCollectionStrategy = services.CollectionStrategy
func NewCompiledCollectionStrategy(provider string) (*CompiledCollectionStrategy, error) {
if provider == "aws" {
return &CompiledCollectionStrategy{
Provider: "aws",
AWSMetrics: &services.AWSMetricsStrategy{},
AWSLogs: &services.AWSLogsStrategy{},
}, nil
}
return nil, errors.NewNotFoundf(services.CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", provider)
}
// Helper for accumulating strategies for enabled services.
func AddServiceStrategy(serviceType string, cs *CompiledCollectionStrategy,
definitionStrat *services.CollectionStrategy, config *types.CloudServiceConfig) error {
if definitionStrat.Provider != cs.Provider {
return errors.NewInternalf(CodeMismatchCloudProvider, "can't add %s service strategy to compiled strategy for %s",
definitionStrat.Provider, cs.Provider)
}
if cs.Provider == "aws" {
if config.Logs != nil && config.Logs.Enabled {
if serviceType == services.S3Sync {
// S3 bucket sync; No cloudwatch logs are appended for this service type;
// Though definition is populated with a custom cloudwatch group that helps in calculating logs connection status
cs.S3Buckets = config.Logs.S3Buckets
} else if definitionStrat.AWSLogs != nil { // services that includes a logs subscription
cs.AWSLogs.Subscriptions = append(
cs.AWSLogs.Subscriptions,
definitionStrat.AWSLogs.Subscriptions...,
)
}
}
if config.Metrics != nil && config.Metrics.Enabled && definitionStrat.AWSMetrics != nil {
cs.AWSMetrics.StreamFilters = append(
cs.AWSMetrics.StreamFilters,
definitionStrat.AWSMetrics.StreamFilters...,
)
}
return nil
}
return errors.NewNotFoundf(services.CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cs.Provider)
}

View File

@@ -1,28 +0,0 @@
package cloudintegrations
import (
"log/slog"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/implawsprovider"
integrationstore "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
)
func NewCloudProviderRegistry(
logger *slog.Logger,
store sqlstore.SQLStore,
reader interfaces.Reader,
querier interfaces.Querier,
) map[integrationstypes.CloudProviderType]integrationstypes.CloudProvider {
registry := make(map[integrationstypes.CloudProviderType]integrationstypes.CloudProvider)
accountsRepo := integrationstore.NewCloudProviderAccountsRepository(store)
serviceConfigRepo := integrationstore.NewServiceConfigRepository(store)
awsProviderImpl := implawsprovider.NewAWSCloudProvider(logger, accountsRepo, serviceConfigRepo, reader, querier)
registry[integrationstypes.CloudProviderAWS] = awsProviderImpl
return registry
}

View File

@@ -1,63 +1,64 @@
package store
package cloudintegrations
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
CodeServiceConfigNotFound = errors.MustNewCode("service_config_not_found")
)
type ServiceConfigDatabase interface {
Get(
get(
ctx context.Context,
orgID string,
cloudAccountId string,
serviceType string,
) ([]byte, error)
) (*types.CloudServiceConfig, *model.ApiError)
Upsert(
upsert(
ctx context.Context,
orgID string,
cloudProvider string,
cloudAccountId string,
serviceId string,
config []byte,
) ([]byte, error)
config types.CloudServiceConfig,
) (*types.CloudServiceConfig, *model.ApiError)
GetAllForAccount(
getAllForAccount(
ctx context.Context,
orgID string,
cloudAccountId string,
) (
map[string][]byte,
error,
configsBySvcId map[string]*types.CloudServiceConfig,
apiErr *model.ApiError,
)
}
func NewServiceConfigRepository(store sqlstore.SQLStore) ServiceConfigDatabase {
return &serviceConfigSQLRepository{store: store}
func newServiceConfigRepository(store sqlstore.SQLStore) (
*serviceConfigSQLRepository, error,
) {
return &serviceConfigSQLRepository{
store: store,
}, nil
}
type serviceConfigSQLRepository struct {
store sqlstore.SQLStore
}
func (r *serviceConfigSQLRepository) Get(
func (r *serviceConfigSQLRepository) get(
ctx context.Context,
orgID string,
cloudAccountId string,
serviceType string,
) ([]byte, error) {
var result integrationstypes.CloudIntegrationService
) (*types.CloudServiceConfig, *model.ApiError) {
var result types.CloudIntegrationService
err := r.store.BunDB().NewSelect().
Model(&result).
@@ -66,30 +67,36 @@ func (r *serviceConfigSQLRepository) Get(
Where("ci.id = ?", cloudAccountId).
Where("cis.type = ?", serviceType).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.WrapNotFoundf(err, CodeServiceConfigNotFound, "couldn't find config for cloud account %s", cloudAccountId)
}
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud service config")
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find config for cloud account %s",
cloudAccountId,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud service config: %w", err,
))
}
return []byte(result.Config), nil
return &result.Config, nil
}
func (r *serviceConfigSQLRepository) Upsert(
func (r *serviceConfigSQLRepository) upsert(
ctx context.Context,
orgID string,
cloudProvider string,
cloudAccountId string,
serviceId string,
config []byte,
) ([]byte, error) {
config types.CloudServiceConfig,
) (*types.CloudServiceConfig, *model.ApiError) {
// get cloud integration id from account id
// if the account is not connected, we don't need to upsert the config
var cloudIntegrationId string
err := r.store.BunDB().NewSelect().
Model((*integrationstypes.CloudIntegration)(nil)).
Model((*types.CloudIntegration)(nil)).
Column("id").
Where("provider = ?", cloudProvider).
Where("account_id = ?", cloudAccountId).
@@ -97,24 +104,20 @@ func (r *serviceConfigSQLRepository) Upsert(
Where("removed_at is NULL").
Where("last_agent_report is not NULL").
Scan(ctx, &cloudIntegrationId)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.WrapNotFoundf(
err,
CodeCloudIntegrationAccountNotFound,
"couldn't find active cloud integration account",
)
}
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud integration id")
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud integration id: %w", err,
))
}
serviceConfig := integrationstypes.CloudIntegrationService{
serviceConfig := types.CloudIntegrationService{
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
Config: string(config),
Config: config,
Type: serviceId,
CloudIntegrationID: cloudIntegrationId,
}
@@ -123,18 +126,21 @@ func (r *serviceConfigSQLRepository) Upsert(
On("conflict(cloud_integration_id, type) do update set config=excluded.config, updated_at=excluded.updated_at").
Exec(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't upsert cloud service config")
return nil, model.InternalError(fmt.Errorf(
"could not upsert cloud service config: %w", err,
))
}
return config, nil
return &serviceConfig.Config, nil
}
func (r *serviceConfigSQLRepository) GetAllForAccount(
func (r *serviceConfigSQLRepository) getAllForAccount(
ctx context.Context,
orgID string,
cloudAccountId string,
) (map[string][]byte, error) {
var serviceConfigs []integrationstypes.CloudIntegrationService
) (map[string]*types.CloudServiceConfig, *model.ApiError) {
serviceConfigs := []types.CloudIntegrationService{}
err := r.store.BunDB().NewSelect().
Model(&serviceConfigs).
@@ -143,13 +149,15 @@ func (r *serviceConfigSQLRepository) GetAllForAccount(
Where("ci.org_id = ?", orgID).
Scan(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query service configs from db")
return nil, model.InternalError(fmt.Errorf(
"could not query service configs from db: %w", err,
))
}
result := make(map[string][]byte)
result := map[string]*types.CloudServiceConfig{}
for _, r := range serviceConfigs {
result[r.Type] = []byte(r.Config)
result[r.Type] = &r.Config
}
return result, nil

View File

@@ -148,146 +148,6 @@
"name": "aws_ApiGateway_Latency_sum",
"unit": "Milliseconds",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_4xx_sum",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_4xx_max",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_4xx_min",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_4xx_count",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_5xx_sum",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_5xx_max",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_5xx_min",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_5xx_count",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_DataProcessed_sum",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_DataProcessed_max",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_DataProcessed_min",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_DataProcessed_count",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ExecutionError_sum",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ExecutionError_max",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ExecutionError_min",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ExecutionError_count",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ClientError_sum",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ClientError_max",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ClientError_min",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ClientError_count",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_IntegrationError_sum",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_IntegrationError_max",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_IntegrationError_min",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_IntegrationError_count",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ConnectCount_sum",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ConnectCount_max",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ConnectCount_min",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ConnectCount_count",
"unit": "Count",
"type": "Gauge"
}
],
"logs": [

View File

@@ -1928,7 +1928,7 @@
"unit": "Percent",
"type": "Gauge",
"description": ""
}
}
]
},
"telemetry_collection_strategy": {
@@ -1951,4 +1951,4 @@
}
]
}
}
}

View File

@@ -0,0 +1,91 @@
package services
import (
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
)
type Metadata struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
}
type Definition struct {
Metadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"data_collected"`
Strategy *CollectionStrategy `json:"telemetry_collection_strategy"`
}
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollected struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CollectionStrategy struct {
Provider string `json:"provider"`
AWSMetrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsStrategy `json:"aws_logs,omitempty"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type
}
type AWSMetricsStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
StreamFilters []struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
} `json:"cloudwatch_metric_stream_filters"`
}
type AWSLogsStrategy struct {
Subscriptions []struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
} `json:"cloudwatch_logs_subscriptions"`
}
type Dashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *dashboardtypes.StorableDashboardData `json:"definition,omitempty"`
}

View File

@@ -2,90 +2,128 @@ package services
import (
"bytes"
"context"
"embed"
"encoding/json"
"fmt"
"io/fs"
"path"
"sort"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/query-service/model"
koanfJson "github.com/knadh/koanf/parsers/json"
"golang.org/x/exp/maps"
)
const (
S3Sync = "s3sync"
)
var (
CodeServiceDefinitionNotFound = errors.MustNewCode("service_definition_not_dound")
CodeUnsupportedCloudProvider = errors.MustNewCode("unsupported_cloud_provider")
CodeUnsupportedServiceType = errors.MustNewCode("unsupported_service_type")
)
type (
AWSServicesProvider struct {
definitions map[string]*integrationstypes.AWSServiceDefinition
func List(cloudProvider string) ([]Definition, *model.ApiError) {
cloudServices, found := supportedServices[cloudProvider]
if !found || cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider,
))
}
)
func (a *AWSServicesProvider) ListServiceDefinitions(ctx context.Context) (map[string]*integrationstypes.AWSServiceDefinition, error) {
return a.definitions, nil
services := maps.Values(cloudServices)
sort.Slice(services, func(i, j int) bool {
return services[i].Id < services[j].Id
})
return services, nil
}
func (a *AWSServicesProvider) GetServiceDefinition(ctx context.Context, serviceName string) (*integrationstypes.AWSServiceDefinition, error) {
def, ok := a.definitions[serviceName]
if !ok {
return nil, errors.NewNotFoundf(CodeServiceDefinitionNotFound, "aws service definition not found: %s", serviceName)
}
return def, nil
}
func NewAWSCloudProviderServices() (*AWSServicesProvider, error) {
definitions, err := readAllServiceDefinitions(integrationstypes.CloudProviderAWS)
if err != nil {
return nil, err
}
serviceDefinitions := make(map[string]*integrationstypes.AWSServiceDefinition)
for id, def := range definitions {
typedDef, ok := def.(*integrationstypes.AWSServiceDefinition)
if !ok {
return nil, fmt.Errorf("invalid type for AWS service definition %s", id)
}
serviceDefinitions[id] = typedDef
}
return &AWSServicesProvider{
definitions: serviceDefinitions,
}, nil
}
//go:embed definitions/*
var definitionFiles embed.FS
func readAllServiceDefinitions(cloudProvider valuer.String) (map[string]any, error) {
rootDirName := "definitions"
cloudProviderDirPath := path.Join(rootDirName, cloudProvider.String())
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
if err != nil {
return nil, err
}
if len(cloudServices) < 1 {
return nil, errors.NewInternalf(errors.CodeInternal, "no service definitions found in %s", cloudProviderDirPath)
func Map(cloudProvider string) (map[string]Definition, error) {
cloudServices, found := supportedServices[cloudProvider]
if !found || cloudServices == nil {
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cloudProvider)
}
return cloudServices, nil
}
func readServiceDefinitionsFromDir(cloudProvider valuer.String, cloudProviderDirPath string) (map[string]any, error) {
svcDefDirs, err := fs.ReadDir(definitionFiles, cloudProviderDirPath)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't list integrations dirs")
func GetServiceDefinition(cloudProvider, serviceType string) (*Definition, error) {
cloudServices := supportedServices[cloudProvider]
if cloudServices == nil {
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cloudProvider)
}
svcDefs := make(map[string]any)
svc, exists := cloudServices[serviceType]
if !exists {
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedServiceType, "%s service not found: %s", cloudProvider, serviceType)
}
return &svc, nil
}
// End of API. Logic for reading service definition files follows
// Service details read from ./serviceDefinitions
// { "providerName": { "service_id": {...}} }
var supportedServices map[string]map[string]Definition
func init() {
err := readAllServiceDefinitions()
if err != nil {
panic(fmt.Errorf(
"couldn't read cloud service definitions: %w", err,
))
}
}
//go:embed definitions/*
var definitionFiles embed.FS
func readAllServiceDefinitions() error {
supportedServices = map[string]map[string]Definition{}
rootDirName := "definitions"
cloudProviderDirs, err := fs.ReadDir(definitionFiles, rootDirName)
if err != nil {
return fmt.Errorf("couldn't read dirs in %s: %w", rootDirName, err)
}
for _, d := range cloudProviderDirs {
if !d.IsDir() {
continue
}
cloudProvider := d.Name()
cloudProviderDirPath := path.Join(rootDirName, cloudProvider)
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
if err != nil {
return fmt.Errorf("couldn't read %s service definitions: %w", cloudProvider, err)
}
if len(cloudServices) < 1 {
return fmt.Errorf("no %s services could be read", cloudProvider)
}
supportedServices[cloudProvider] = cloudServices
}
return nil
}
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]Definition, error,
) {
svcDefDirs, err := fs.ReadDir(definitionFiles, cloudProviderDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't list integrations dirs: %w", err)
}
svcDefs := map[string]Definition{}
for _, d := range svcDefDirs {
if !d.IsDir() {
@@ -95,71 +133,103 @@ func readServiceDefinitionsFromDir(cloudProvider valuer.String, cloudProviderDir
svcDirPath := path.Join(cloudProviderDirPath, d.Name())
s, err := readServiceDefinition(cloudProvider, svcDirPath)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
}
_, exists := svcDefs[s.GetId()]
_, exists := svcDefs[s.Id]
if exists {
return nil, errors.NewInternalf(errors.CodeInternal, "duplicate service definition for id %s at %s", s.GetId(), d.Name())
return nil, fmt.Errorf(
"duplicate service definition for id %s at %s", s.Id, d.Name(),
)
}
svcDefs[s.GetId()] = s
svcDefs[s.Id] = *s
}
return svcDefs, nil
}
func readServiceDefinition(cloudProvider valuer.String, svcDirpath string) (integrationstypes.Definition, error) {
func readServiceDefinition(cloudProvider string, svcDirpath string) (*Definition, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json")
serializedSpec, err := definitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't read integration definition in %s", svcDirpath)
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
svcDirpath, err,
)
}
integrationSpec, err := koanfJson.Parser().Unmarshal(serializedSpec)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't parse integration definition in %s", svcDirpath)
return nil, fmt.Errorf(
"couldn't parse integration.json from %s: %w",
integrationJsonPath, err,
)
}
hydrated, err := integrations.HydrateFileUris(integrationSpec, definitionFiles, svcDirpath)
hydrated, err := integrations.HydrateFileUris(
integrationSpec, definitionFiles, svcDirpath,
)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't hydrate integration definition in %s", svcDirpath)
return nil, fmt.Errorf(
"couldn't hydrate files referenced in service definition %s: %w",
integrationJsonPath, err,
)
}
hydratedSpec := hydrated.(map[string]any)
var serviceDef integrationstypes.Definition
switch cloudProvider {
case integrationstypes.CloudProviderAWS:
serviceDef = &integrationstypes.AWSServiceDefinition{}
default:
// ideally this shouldn't happen hence throwing internal error
return nil, errors.NewInternalf(errors.CodeInternal, "unsupported cloud provider: %s", cloudProvider)
serviceDef, err := ParseStructWithJsonTagsFromMap[Definition](hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
integrationJsonPath, err,
)
}
err = parseStructWithJsonTagsFromMap(hydratedSpec, serviceDef)
err = validateServiceDefinition(serviceDef)
if err != nil {
return nil, err
}
err = serviceDef.Validate()
if err != nil {
return nil, err
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}
serviceDef.Strategy.Provider = cloudProvider
return serviceDef, nil
}
func parseStructWithJsonTagsFromMap(data map[string]any, target interface{}) error {
mapJson, err := json.Marshal(data)
if err != nil {
return errors.WrapInternalf(err, errors.CodeInternal, "couldn't marshal service definition json data")
func validateServiceDefinition(s *Definition) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
if _, seen := seenDashboardIds[dd.Id]; seen {
return fmt.Errorf("multiple dashboards found with id %s", dd.Id)
}
seenDashboardIds[dd.Id] = nil
}
decoder := json.NewDecoder(bytes.NewReader(mapJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(target)
if err != nil {
return errors.WrapInternalf(err, errors.CodeInternal, "couldn't unmarshal service definition json data")
if s.Strategy == nil {
return fmt.Errorf("telemetry_collection_strategy is required")
}
// potentially more to follow
return nil
}
func ParseStructWithJsonTagsFromMap[StructType any](data map[string]any) (
*StructType, error,
) {
mapJson, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("couldn't marshal map to json: %w", err)
}
var res StructType
decoder := json.NewDecoder(bytes.NewReader(mapJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal json back to struct: %w", err)
}
return &res, nil
}

View File

@@ -1,3 +1,35 @@
package services
// TODO: add more tests for services package
import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/stretchr/testify/require"
)
func TestAvailableServices(t *testing.T) {
require := require.New(t)
// should be able to list available services.
_, apiErr := List("bad-cloud-provider")
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
awsSvcs, apiErr := List("aws")
require.Nil(apiErr)
require.Greater(len(awsSvcs), 0)
// should be able to get details of a service
_, err := GetServiceDefinition(
"aws", "bad-service-id",
)
require.NotNil(err)
require.True(errors.Ast(err, errors.TypeNotFound))
svc, err := GetServiceDefinition(
"aws", awsSvcs[0].Id,
)
require.Nil(err)
require.Equal(*svc, awsSvcs[0])
}

View File

@@ -6,7 +6,11 @@ import (
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/queryparser"
"io"
"math"
@@ -21,19 +25,14 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/errors"
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/prometheus/promql"
@@ -45,6 +44,7 @@ import (
"github.com/SigNoz/signoz/pkg/contextlinks"
traceFunnelsModule "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/inframetrics"
queues2 "github.com/SigNoz/signoz/pkg/query-service/app/integrations/messagingQueues/queues"
"github.com/SigNoz/signoz/pkg/query-service/app/logs"
@@ -113,7 +113,7 @@ type APIHandler struct {
IntegrationsController *integrations.Controller
cloudIntegrationsRegistry map[integrationstypes.CloudProviderType]integrationstypes.CloudProvider
CloudIntegrationsController *cloudintegrations.Controller
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
@@ -162,6 +162,9 @@ type APIHandlerOpts struct {
// Integrations
IntegrationsController *integrations.Controller
// Cloud Provider Integrations
CloudIntegrationsController *cloudintegrations.Controller
// Log parsing pipelines
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
@@ -177,8 +180,6 @@ type APIHandlerOpts struct {
QueryParserAPI *queryparser.API
Signoz *signoz.SigNoz
Logger *slog.Logger
}
// NewAPIHandler returns an APIHandler
@@ -214,19 +215,12 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
summaryService := metricsexplorer.NewSummaryService(opts.Reader, opts.RuleManager, opts.Signoz.Modules.Dashboard)
//quickFilterModule := quickfilter.NewAPI(opts.QuickFilterModule)
cloudIntegrationsRegistry := cloudintegrations.NewCloudProviderRegistry(
opts.Logger,
opts.Signoz.SQLStore,
opts.Reader,
querier,
)
aH := &APIHandler{
reader: opts.Reader,
temporalityMap: make(map[string]map[v3.Temporality]bool),
ruleManager: opts.RuleManager,
IntegrationsController: opts.IntegrationsController,
cloudIntegrationsRegistry: cloudIntegrationsRegistry,
CloudIntegrationsController: opts.CloudIntegrationsController,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
querierV2: querierv2,
@@ -1223,22 +1217,13 @@ func (aH *APIHandler) Get(rw http.ResponseWriter, r *http.Request) {
}
dashboard := new(dashboardtypes.Dashboard)
if integrationstypes.IsCloudIntegrationDashboardUuid(id) {
cloudProvider, err := integrationstypes.GetCloudProviderFromDashboardID(id)
if err != nil {
render.Error(rw, err)
if aH.CloudIntegrationsController.IsCloudIntegrationDashboardUuid(id) {
cloudIntegrationDashboard, apiErr := aH.CloudIntegrationsController.GetDashboardById(ctx, orgID, id)
if apiErr != nil {
render.Error(rw, errorsV2.Wrapf(apiErr, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to get dashboard"))
return
}
integrationDashboard, err := aH.cloudIntegrationsRegistry[cloudProvider].GetDashboard(ctx, &integrationstypes.GettableDashboard{
ID: id,
OrgID: orgID,
})
if err != nil {
render.Error(rw, err)
return
}
dashboard = integrationDashboard
dashboard = cloudIntegrationDashboard
} else if aH.IntegrationsController.IsInstalledIntegrationDashboardID(id) {
integrationDashboard, apiErr := aH.IntegrationsController.GetInstalledIntegrationDashboardById(ctx, orgID, id)
if apiErr != nil {
@@ -1302,13 +1287,11 @@ func (aH *APIHandler) List(rw http.ResponseWriter, r *http.Request) {
dashboards = append(dashboards, installedIntegrationDashboards...)
}
for _, provider := range aH.cloudIntegrationsRegistry {
cloudIntegrationDashboards, err := provider.GetAvailableDashboards(ctx, orgID)
if err != nil {
zap.L().Error("failed to get dashboards for cloud integrations", zap.Error(apiErr))
} else {
dashboards = append(dashboards, cloudIntegrationDashboards...)
}
cloudIntegrationDashboards, apiErr := aH.CloudIntegrationsController.AvailableDashboards(ctx, orgID)
if apiErr != nil {
zap.L().Error("failed to get dashboards for cloud integrations", zap.Error(apiErr))
} else {
dashboards = append(dashboards, cloudIntegrationDashboards...)
}
gettableDashboards, err := dashboardtypes.NewGettableDashboardsFromDashboards(dashboards)
@@ -3284,15 +3267,15 @@ func (aH *APIHandler) GetIntegrationConnectionStatus(w http.ResponseWriter, r *h
lookbackSeconds = 15 * 60
}
connectionStatus, err := aH.calculateConnectionStatus(
connectionStatus, apiErr := aH.calculateConnectionStatus(
r.Context(), orgID, connectionTests, lookbackSeconds,
)
if err != nil {
render.Error(w, err)
if apiErr != nil {
RespondError(w, apiErr, "Failed to calculate integration connection status")
return
}
render.Success(w, http.StatusOK, connectionStatus)
aH.Respond(w, connectionStatus)
}
func (aH *APIHandler) calculateConnectionStatus(
@@ -3300,11 +3283,10 @@ func (aH *APIHandler) calculateConnectionStatus(
orgID valuer.UUID,
connectionTests *integrations.IntegrationConnectionTests,
lookbackSeconds int64,
) (*integrations.IntegrationConnectionStatus, error) {
) (*integrations.IntegrationConnectionStatus, *model.ApiError) {
// Calculate connection status for signals in parallel
result := &integrations.IntegrationConnectionStatus{}
// TODO: migrate to errors package
errors := []*model.ApiError{}
var resultLock sync.Mutex
@@ -3502,14 +3484,12 @@ func (aH *APIHandler) UninstallIntegration(w http.ResponseWriter, r *http.Reques
aH.Respond(w, map[string]interface{}{})
}
// RegisterCloudIntegrationsRoutes register routes for cloud provider integrations
// cloud provider integrations
func (aH *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *middleware.AuthZ) {
subRouter := router.PathPrefix("/api/v1/cloud-integrations").Subrouter()
subRouter.Use(middleware.NewRecovery(aH.Signoz.Instrumentation.Logger()).Wrap)
subRouter.HandleFunc(
"/{cloudProvider}/accounts/generate-connection-url", am.EditAccess(aH.CloudIntegrationsGenerateConnectionArtifact),
"/{cloudProvider}/accounts/generate-connection-url", am.EditAccess(aH.CloudIntegrationsGenerateConnectionUrl),
).Methods(http.MethodPost)
subRouter.HandleFunc(
@@ -3543,199 +3523,170 @@ func (aH *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *mi
subRouter.HandleFunc(
"/{cloudProvider}/services/{serviceId}/config", am.EditAccess(aH.CloudIntegrationsUpdateServiceConfig),
).Methods(http.MethodPost)
}
func (aH *APIHandler) CloudIntegrationsGenerateConnectionArtifact(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
func (aH *APIHandler) CloudIntegrationsListConnectedAccounts(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
resp, apiErr := aH.CloudIntegrationsController.ListConnectedAccounts(
r.Context(), claims.OrgID, cloudProvider,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
reqBody, err := io.ReadAll(r.Body)
if err != nil {
render.Error(w, errors.WrapInternalf(err, errors.CodeInternal, "failed to read request body"))
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].GenerateConnectionArtifact(r.Context(), &integrationstypes.PostableConnectionArtifact{
OrgID: claims.OrgID,
Data: reqBody,
})
if err != nil {
aH.Signoz.Instrumentation.Logger().ErrorContext(r.Context(),
"failed to generate connection artifact for cloud integration",
slog.String("cloudProvider", cloudProviderString),
slog.String("orgID", claims.OrgID),
)
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, resp)
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsListConnectedAccounts(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
func (aH *APIHandler) CloudIntegrationsGenerateConnectionUrl(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
req := cloudintegrations.GenerateConnectionUrlRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].ListConnectedAccounts(r.Context(), claims.OrgID)
if err != nil {
render.Error(w, err)
result, apiErr := aH.CloudIntegrationsController.GenerateConnectionUrl(
r.Context(), claims.OrgID, cloudProvider, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
render.Success(w, http.StatusOK, resp)
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsGetAccountStatus(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
func (aH *APIHandler) CloudIntegrationsGetAccountStatus(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].GetAccountStatus(r.Context(), claims.OrgID, accountId)
if err != nil {
render.Error(w, err)
resp, apiErr := aH.CloudIntegrationsController.GetAccountStatus(
r.Context(), claims.OrgID, cloudProvider, accountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
render.Success(w, http.StatusOK, resp)
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsAgentCheckIn(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
func (aH *APIHandler) CloudIntegrationsAgentCheckIn(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
req := cloudintegrations.AgentCheckInRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
result, err := aH.CloudIntegrationsController.CheckInAsAgent(
r.Context(), claims.OrgID, cloudProvider, req,
)
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
req := new(integrationstypes.PostableAgentCheckInPayload)
if err = json.NewDecoder(r.Body).Decode(req); err != nil {
render.Error(w, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid request body"))
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
req.OrgID = claims.OrgID
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].AgentCheckIn(r.Context(), req)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, resp)
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsUpdateAccountConfig(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
func (aH *APIHandler) CloudIntegrationsUpdateAccountConfig(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
reqBody, err := io.ReadAll(r.Body)
if err != nil {
render.Error(w, errors.WrapInternalf(err, errors.CodeInternal, "failed to read request body"))
req := cloudintegrations.UpdateAccountConfigRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].UpdateAccountConfig(r.Context(), &integrationstypes.PatchableAccountConfig{
OrgID: claims.OrgID,
AccountId: accountId,
Data: reqBody,
})
if err != nil {
render.Error(w, err)
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
render.Success(w, http.StatusOK, resp)
return
result, apiErr := aH.CloudIntegrationsController.UpdateAccountConfig(
r.Context(), claims.OrgID, cloudProvider, accountId, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsDisconnectAccount(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
func (aH *APIHandler) CloudIntegrationsDisconnectAccount(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
result, err := aH.cloudIntegrationsRegistry[cloudProvider].DisconnectAccount(r.Context(), claims.OrgID, accountId)
if err != nil {
render.Error(w, err)
result, apiErr := aH.CloudIntegrationsController.DisconnectAccount(
r.Context(), claims.OrgID, cloudProvider, accountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
render.Success(w, http.StatusOK, result)
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsListServices(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
func (aH *APIHandler) CloudIntegrationsListServices(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
var cloudAccountId *string
@@ -3744,22 +3695,26 @@ func (aH *APIHandler) CloudIntegrationsListServices(w http.ResponseWriter, r *ht
cloudAccountId = &cloudAccountIdQP
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].ListServices(r.Context(), claims.OrgID, cloudAccountId)
if err != nil {
render.Error(w, err)
resp, apiErr := aH.CloudIntegrationsController.ListServices(
r.Context(), claims.OrgID, cloudProvider, cloudAccountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
render.Success(w, http.StatusOK, resp)
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsGetServiceDetails(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) CloudIntegrationsGetServiceDetails(
w http.ResponseWriter, r *http.Request,
) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
@@ -3771,14 +3726,7 @@ func (aH *APIHandler) CloudIntegrationsGetServiceDetails(w http.ResponseWriter,
return
}
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
cloudProvider := mux.Vars(r)["cloudProvider"]
serviceId := mux.Vars(r)["serviceId"]
var cloudAccountId *string
@@ -3788,59 +3736,270 @@ func (aH *APIHandler) CloudIntegrationsGetServiceDetails(w http.ResponseWriter,
cloudAccountId = &cloudAccountIdQP
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].GetServiceDetails(r.Context(), &integrationstypes.GetServiceDetailsReq{
OrgID: orgID,
ServiceId: serviceId,
CloudAccountID: cloudAccountId,
})
if err != nil {
render.Error(w, err)
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
render.Success(w, http.StatusOK, resp)
return
}
func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationstypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
serviceId := mux.Vars(r)["serviceId"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
reqBody, err := io.ReadAll(r.Body)
if err != nil {
render.Error(w, errors.WrapInternalf(err,
errors.CodeInternal,
"failed to read update service config request body",
))
return
}
result, err := aH.cloudIntegrationsRegistry[cloudProvider].UpdateServiceConfig(
r.Context(), &integrationstypes.PatchableServiceConfig{
OrgID: claims.OrgID,
ServiceId: serviceId,
Config: reqBody,
},
resp, err := aH.CloudIntegrationsController.GetServiceDetails(
r.Context(), claims.OrgID, cloudProvider, serviceId, cloudAccountId,
)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, result)
// Add connection status for the 2 signals.
if cloudAccountId != nil {
connStatus, apiErr := aH.calculateCloudIntegrationServiceConnectionStatus(
r.Context(), orgID, cloudProvider, *cloudAccountId, resp,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
resp.ConnectionStatus = connStatus
}
aH.Respond(w, resp)
}
func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
ctx context.Context,
orgID valuer.UUID,
cloudProvider string,
cloudAccountId string,
svcDetails *cloudintegrations.ServiceDetails,
) (*cloudintegrations.ServiceConnectionStatus, *model.ApiError) {
if cloudProvider != "aws" {
// TODO(Raj): Make connection check generic for all providers in a follow up change
return nil, model.BadRequest(
fmt.Errorf("unsupported cloud provider: %s", cloudProvider),
)
}
telemetryCollectionStrategy := svcDetails.Strategy
if telemetryCollectionStrategy == nil {
return nil, model.InternalError(fmt.Errorf(
"service doesn't have telemetry collection strategy: %s", svcDetails.Id,
))
}
result := &cloudintegrations.ServiceConnectionStatus{}
errors := []*model.ApiError{}
var resultLock sync.Mutex
var wg sync.WaitGroup
// Calculate metrics connection status
if telemetryCollectionStrategy.AWSMetrics != nil {
wg.Add(1)
go func() {
defer wg.Done()
metricsConnStatus, apiErr := aH.calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx, cloudAccountId, telemetryCollectionStrategy.AWSMetrics, svcDetails.DataCollected.Metrics,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Metrics = metricsConnStatus
}
}()
}
// Calculate logs connection status
if telemetryCollectionStrategy.AWSLogs != nil {
wg.Add(1)
go func() {
defer wg.Done()
logsConnStatus, apiErr := aH.calculateAWSIntegrationSvcLogsConnectionStatus(
ctx, orgID, cloudAccountId, telemetryCollectionStrategy.AWSLogs,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Logs = logsConnStatus
}
}()
}
wg.Wait()
if len(errors) > 0 {
return nil, errors[0]
}
return result, nil
}
func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *services.AWSMetricsStrategy,
metricsCollectedBySvc []services.CollectedMetric,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.StreamFilters) < 1 {
return nil, nil
}
expectedLabelValues := map[string]string{
"cloud_provider": "aws",
"cloud_account_id": cloudAccountId,
}
metricsNamespace := strategy.StreamFilters[0].Namespace
metricsNamespaceParts := strings.Split(metricsNamespace, "/")
if len(metricsNamespaceParts) >= 2 {
expectedLabelValues["service_namespace"] = metricsNamespaceParts[0]
expectedLabelValues["service_name"] = metricsNamespaceParts[1]
} else {
// metrics for single word namespaces like "CWAgent" do not
// have the service_namespace label populated
expectedLabelValues["service_name"] = metricsNamespaceParts[0]
}
metricNamesCollectedBySvc := []string{}
for _, cm := range metricsCollectedBySvc {
metricNamesCollectedBySvc = append(metricNamesCollectedBySvc, cm.Name)
}
statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric(
ctx, metricNamesCollectedBySvc, expectedLabelValues,
)
if apiErr != nil {
return nil, apiErr
}
if statusForLastReceivedMetric != nil {
return &cloudintegrations.SignalConnectionStatus{
LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis,
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
}
func (aH *APIHandler) calculateAWSIntegrationSvcLogsConnectionStatus(
ctx context.Context,
orgID valuer.UUID,
cloudAccountId string,
strategy *services.AWSLogsStrategy,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.Subscriptions) < 1 {
return nil, nil
}
logGroupNamePrefix := strategy.Subscriptions[0].LogGroupNamePrefix
if len(logGroupNamePrefix) < 1 {
return nil, nil
}
logsConnTestFilter := &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "cloud.account.id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "=",
Value: cloudAccountId,
},
{
Key: v3.AttributeKey{
Key: "aws.cloudwatch.log_group_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "like",
Value: logGroupNamePrefix + "%",
},
},
}
// TODO(Raj): Receive this as a param from UI in the future.
lookbackSeconds := int64(30 * 60)
qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: logsConnTestFilter,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, _, err := aH.querier.QueryRange(
ctx, orgID, qrParams,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]
return &cloudintegrations.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
}
func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
serviceId := mux.Vars(r)["serviceId"]
req := cloudintegrations.UpdateServiceConfigRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
result, err := aH.CloudIntegrationsController.UpdateServiceConfig(
r.Context(), claims.OrgID, cloudProvider, serviceId, &req,
)
if err != nil {
render.Error(w, err)
return
}
aH.Respond(w, result)
}
// logs

View File

@@ -10,6 +10,7 @@ import (
)
var dotMetricMap = map[string]string{
"system_filesystem_usage": "system.filesystem.usage",
"system_cpu_time": "system.cpu.time",
"system_memory_usage": "system.memory.usage",
"system_cpu_load_average_15m": "system.cpu.load_average.15m",

View File

@@ -53,10 +53,11 @@ var (
}
queryNamesForTopHosts = map[string][]string{
"cpu": {"A", "B", "F1"},
"memory": {"C", "D", "F2"},
"wait": {"E", "F", "F3"},
"load15": {"G"},
"cpu": {"A", "B", "F1"},
"memory": {"C", "D", "F2"},
"wait": {"E", "F", "F3"},
"load15": {"G"},
"filesystem": {"H", "I", "F4"},
}
// TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric
@@ -67,10 +68,11 @@ var (
GetDotMetrics("os_type"),
}
metricNamesForHosts = map[string]string{
"cpu": GetDotMetrics("system_cpu_time"),
"memory": GetDotMetrics("system_memory_usage"),
"load15": GetDotMetrics("system_cpu_load_average_15m"),
"wait": GetDotMetrics("system_cpu_time"),
"cpu": GetDotMetrics("system_cpu_time"),
"memory": GetDotMetrics("system_memory_usage"),
"load15": GetDotMetrics("system_cpu_load_average_15m"),
"wait": GetDotMetrics("system_cpu_time"),
"filesystem": GetDotMetrics("system_filesystem_usage"),
}
)
@@ -494,10 +496,11 @@ func (h *HostsRepo) GetHostList(ctx context.Context, orgID valuer.UUID, req mode
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.HostListRecord{
CPU: -1,
Memory: -1,
Wait: -1,
Load15: -1,
CPU: -1,
Memory: -1,
Wait: -1,
Load15: -1,
Filesystem: -1,
}
if hostName, ok := row.Data[hostNameAttrKey].(string); ok {
@@ -516,6 +519,9 @@ func (h *HostsRepo) GetHostList(ctx context.Context, orgID valuer.UUID, req mode
if load15, ok := row.Data["G"].(float64); ok {
record.Load15 = load15
}
if filesystem, ok := row.Data["F4"].(float64); ok {
record.Filesystem = filesystem
}
record.Meta = map[string]string{}
if _, ok := hostAttrs[record.HostName]; ok {
record.Meta = hostAttrs[record.HostName]

View File

@@ -269,42 +269,130 @@ var HostsTableListQuery = v3.QueryRangeParamsV3{
Items: []v3.FilterItem{},
},
},
"G": {
QueryName: "G",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForHosts["load15"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorNotContains,
Value: agentNameToIgnore,
},
},
},
GroupBy: []v3.AttributeKey{
{
Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "G",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
Legend: "CPU Load Average (15m)",
"G": {
QueryName: "G",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForHosts["load15"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorNotContains,
Value: agentNameToIgnore,
},
},
},
GroupBy: []v3.AttributeKey{
{
Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "G",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
Legend: "CPU Load Average (15m)",
},
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForHosts["filesystem"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: v3.FilterOperatorEqual,
Value: "used",
},
{
Key: v3.AttributeKey{
Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorNotContains,
Value: agentNameToIgnore,
},
},
},
GroupBy: []v3.AttributeKey{
{
Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "H",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForHosts["filesystem"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorNotContains,
Value: agentNameToIgnore,
},
},
},
GroupBy: []v3.AttributeKey{
{
Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "I",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"F4": {
QueryName: "F4",
Expression: "H/I",
Legend: "Disk Usage (%)",
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
},
},
PanelType: v3.PanelTypeTable,
QueryType: v3.QueryTypeBuilder,

View File

@@ -11,7 +11,6 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -108,7 +107,7 @@ type IntegrationsListItem struct {
type Integration struct {
IntegrationDetails
Installation *integrationstypes.InstalledIntegration `json:"installation"`
Installation *types.InstalledIntegration `json:"installation"`
}
type Manager struct {
@@ -224,7 +223,7 @@ func (m *Manager) InstallIntegration(
ctx context.Context,
orgId string,
integrationId string,
config integrationstypes.InstalledIntegrationConfig,
config types.InstalledIntegrationConfig,
) (*IntegrationsListItem, *model.ApiError) {
integrationDetails, apiErr := m.getIntegrationDetails(ctx, integrationId)
if apiErr != nil {
@@ -430,7 +429,7 @@ func (m *Manager) getInstalledIntegration(
ctx context.Context,
orgId string,
integrationId string,
) (*integrationstypes.InstalledIntegration, *model.ApiError) {
) (*types.InstalledIntegration, *model.ApiError) {
iis, apiErr := m.installedIntegrationsRepo.get(
ctx, orgId, []string{integrationId},
)
@@ -458,7 +457,7 @@ func (m *Manager) getInstalledIntegrations(
return nil, apiErr
}
installedTypes := utils.MapSlice(installations, func(i integrationstypes.InstalledIntegration) string {
installedTypes := utils.MapSlice(installations, func(i types.InstalledIntegration) string {
return i.Type
})
integrationDetails, apiErr := m.availableIntegrationsRepo.get(ctx, installedTypes)

View File

@@ -4,22 +4,22 @@ import (
"context"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/types"
)
type InstalledIntegrationsRepo interface {
list(ctx context.Context, orgId string) ([]integrationstypes.InstalledIntegration, *model.ApiError)
list(ctx context.Context, orgId string) ([]types.InstalledIntegration, *model.ApiError)
get(
ctx context.Context, orgId string, integrationTypes []string,
) (map[string]integrationstypes.InstalledIntegration, *model.ApiError)
) (map[string]types.InstalledIntegration, *model.ApiError)
upsert(
ctx context.Context,
orgId string,
integrationType string,
config integrationstypes.InstalledIntegrationConfig,
) (*integrationstypes.InstalledIntegration, *model.ApiError)
config types.InstalledIntegrationConfig,
) (*types.InstalledIntegration, *model.ApiError)
delete(ctx context.Context, orgId string, integrationType string) *model.ApiError
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/integrationstypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
@@ -27,8 +26,8 @@ func NewInstalledIntegrationsSqliteRepo(store sqlstore.SQLStore) (
func (r *InstalledIntegrationsSqliteRepo) list(
ctx context.Context,
orgId string,
) ([]integrationstypes.InstalledIntegration, *model.ApiError) {
integrations := []integrationstypes.InstalledIntegration{}
) ([]types.InstalledIntegration, *model.ApiError) {
integrations := []types.InstalledIntegration{}
err := r.store.BunDB().NewSelect().
Model(&integrations).
@@ -45,8 +44,8 @@ func (r *InstalledIntegrationsSqliteRepo) list(
func (r *InstalledIntegrationsSqliteRepo) get(
ctx context.Context, orgId string, integrationTypes []string,
) (map[string]integrationstypes.InstalledIntegration, *model.ApiError) {
integrations := []integrationstypes.InstalledIntegration{}
) (map[string]types.InstalledIntegration, *model.ApiError) {
integrations := []types.InstalledIntegration{}
typeValues := []interface{}{}
for _, integrationType := range integrationTypes {
@@ -63,7 +62,7 @@ func (r *InstalledIntegrationsSqliteRepo) get(
))
}
result := map[string]integrationstypes.InstalledIntegration{}
result := map[string]types.InstalledIntegration{}
for _, ii := range integrations {
result[ii.Type] = ii
}
@@ -75,10 +74,10 @@ func (r *InstalledIntegrationsSqliteRepo) upsert(
ctx context.Context,
orgId string,
integrationType string,
config integrationstypes.InstalledIntegrationConfig,
) (*integrationstypes.InstalledIntegration, *model.ApiError) {
config types.InstalledIntegrationConfig,
) (*types.InstalledIntegration, *model.ApiError) {
integration := integrationstypes.InstalledIntegration{
integration := types.InstalledIntegration{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
@@ -115,7 +114,7 @@ func (r *InstalledIntegrationsSqliteRepo) delete(
ctx context.Context, orgId string, integrationType string,
) *model.ApiError {
_, dbErr := r.store.BunDB().NewDelete().
Model(&integrationstypes.InstalledIntegration{}).
Model(&types.InstalledIntegration{}).
Where("type = ?", integrationType).
Where("org_id = ?", orgId).
Exec(ctx)

View File

@@ -26,6 +26,7 @@ import (
querierAPI "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp"
@@ -70,6 +71,11 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
return nil, err
}
cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SQLStore)
if err != nil {
return nil, err
}
cacheForTraceDetail, err := memorycache.New(context.TODO(), signoz.Instrumentation.ToProviderSettings(), cache.Config{
Provider: "memory",
Memory: cache.Memory{
@@ -121,6 +127,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
Reader: reader,
RuleManager: rm,
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
FluxInterval: config.Querier.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
@@ -128,7 +135,6 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
QueryParserAPI: queryparser.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.QueryParser),
Logger: signoz.Instrumentation.Logger(),
})
if err != nil {
return nil, err

View File

@@ -26,14 +26,15 @@ type HostListRequest struct {
}
type HostListRecord struct {
HostName string `json:"hostName"`
Active bool `json:"active"`
OS string `json:"os"`
CPU float64 `json:"cpu"`
Memory float64 `json:"memory"`
Wait float64 `json:"wait"`
Load15 float64 `json:"load15"`
Meta map[string]string `json:"meta"`
HostName string `json:"hostName"`
Active bool `json:"active"`
OS string `json:"os"`
CPU float64 `json:"cpu"`
Memory float64 `json:"memory"`
Wait float64 `json:"wait"`
Load15 float64 `json:"load15"`
Filesystem float64 `json:"filesystem"`
Meta map[string]string `json:"meta"`
}
type HostListResponse struct {
@@ -64,6 +65,10 @@ func (r *HostListResponse) SortBy(orderBy *v3.OrderBy) {
sort.Slice(r.Records, func(i, j int) bool {
return r.Records[i].Wait > r.Records[j].Wait
})
case "filesystem":
sort.Slice(r.Records, func(i, j int) bool {
return r.Records[i].Filesystem > r.Records[j].Filesystem
})
}
// the default is descending
if orderBy.Order == v3.DirectionAsc {

244
pkg/types/integration.go Normal file
View File

@@ -0,0 +1,244 @@
package types
import (
"database/sql/driver"
"encoding/json"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/uptrace/bun"
)
type IntegrationUserEmail string
const (
AWSIntegrationUserEmail IntegrationUserEmail = "aws-integration@signoz.io"
)
var AllIntegrationUserEmails = []IntegrationUserEmail{
AWSIntegrationUserEmail,
}
// --------------------------------------------------------------------------
// Normal integration uses just the installed_integration table
// --------------------------------------------------------------------------
type InstalledIntegration struct {
bun.BaseModel `bun:"table:installed_integration"`
Identifiable
Type string `json:"type" bun:"type,type:text,unique:org_id_type"`
Config InstalledIntegrationConfig `json:"config" bun:"config,type:text"`
InstalledAt time.Time `json:"installed_at" bun:"installed_at,default:current_timestamp"`
OrgID string `json:"org_id" bun:"org_id,type:text,unique:org_id_type,references:organizations(id),on_delete:cascade"`
}
type InstalledIntegrationConfig map[string]interface{}
// For serializing from db
func (c *InstalledIntegrationConfig) Scan(src interface{}) error {
var data []byte
switch v := src.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
return json.Unmarshal(data, c)
}
// For serializing to db
func (c *InstalledIntegrationConfig) Value() (driver.Value, error) {
filterSetJson, err := json.Marshal(c)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "could not serialize integration config to JSON")
}
return filterSetJson, nil
}
// --------------------------------------------------------------------------
// Cloud integration uses the cloud_integration table
// and cloud_integrations_service table
// --------------------------------------------------------------------------
type CloudIntegration struct {
bun.BaseModel `bun:"table:cloud_integration"`
Identifiable
TimeAuditable
Provider string `json:"provider" bun:"provider,type:text,unique:provider_id"`
Config *AccountConfig `json:"config" bun:"config,type:text"`
AccountID *string `json:"account_id" bun:"account_id,type:text"`
LastAgentReport *AgentReport `json:"last_agent_report" bun:"last_agent_report,type:text"`
RemovedAt *time.Time `json:"removed_at" bun:"removed_at,type:timestamp,nullzero"`
OrgID string `bun:"org_id,type:text,unique:provider_id"`
}
func (a *CloudIntegration) Status() AccountStatus {
status := AccountStatus{}
if a.LastAgentReport != nil {
lastHeartbeat := a.LastAgentReport.TimestampMillis
status.Integration.LastHeartbeatTsMillis = &lastHeartbeat
}
return status
}
func (a *CloudIntegration) Account() Account {
ca := Account{Id: a.ID.StringValue(), Status: a.Status()}
if a.AccountID != nil {
ca.CloudAccountId = *a.AccountID
}
if a.Config != nil {
ca.Config = *a.Config
} else {
ca.Config = DefaultAccountConfig()
}
return ca
}
type Account struct {
Id string `json:"id"`
CloudAccountId string `json:"cloud_account_id"`
Config AccountConfig `json:"config"`
Status AccountStatus `json:"status"`
}
type AccountStatus struct {
Integration AccountIntegrationStatus `json:"integration"`
}
type AccountIntegrationStatus struct {
LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"`
}
func DefaultAccountConfig() AccountConfig {
return AccountConfig{
EnabledRegions: []string{},
}
}
type AccountConfig struct {
EnabledRegions []string `json:"regions"`
}
// For serializing from db
func (c *AccountConfig) Scan(src any) error {
var data []byte
switch v := src.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
return json.Unmarshal(data, c)
}
// For serializing to db
func (c *AccountConfig) Value() (driver.Value, error) {
if c == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "cloud account config is nil")
}
serialized, err := json.Marshal(c)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't serialize cloud account config to JSON")
}
return serialized, nil
}
type AgentReport struct {
TimestampMillis int64 `json:"timestamp_millis"`
Data map[string]any `json:"data"`
}
// For serializing from db
func (r *AgentReport) Scan(src any) error {
var data []byte
switch v := src.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
return json.Unmarshal(data, r)
}
// For serializing to db
func (r *AgentReport) Value() (driver.Value, error) {
if r == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "agent report is nil")
}
serialized, err := json.Marshal(r)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize agent report to JSON",
)
}
return serialized, nil
}
type CloudIntegrationService struct {
bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"`
Identifiable
TimeAuditable
Type string `bun:"type,type:text,notnull,unique:cloud_integration_id_type"`
Config CloudServiceConfig `bun:"config,type:text"`
CloudIntegrationID string `bun:"cloud_integration_id,type:text,notnull,unique:cloud_integration_id_type,references:cloud_integrations(id),on_delete:cascade"`
}
type CloudServiceLogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type CloudServiceMetricsConfig struct {
Enabled bool `json:"enabled"`
}
type CloudServiceConfig struct {
Logs *CloudServiceLogsConfig `json:"logs,omitempty"`
Metrics *CloudServiceMetricsConfig `json:"metrics,omitempty"`
}
// For serializing from db
func (c *CloudServiceConfig) Scan(src any) error {
var data []byte
switch src := src.(type) {
case []byte:
data = src
case string:
data = []byte(src)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
return json.Unmarshal(data, c)
}
// For serializing to db
func (c *CloudServiceConfig) Value() (driver.Value, error) {
if c == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "cloud service config is nil")
}
serialized, err := json.Marshal(c)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize cloud service config to JSON",
)
}
return serialized, nil
}

View File

@@ -1,464 +0,0 @@
package integrationstypes
import (
"context"
"database/sql/driver"
"encoding/json"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
// CloudProvider defines the interface to be implemented by different cloud providers.
// This is generic interface so it will be accepting and returning generic types instead of concrete.
// It's the cloud provider's responsibility to cast them to appropriate types and validate
type CloudProvider interface {
GetName() CloudProviderType
AgentCheckIn(ctx context.Context, req *PostableAgentCheckInPayload) (any, error)
GenerateConnectionArtifact(ctx context.Context, req *PostableConnectionArtifact) (any, error)
GetAccountStatus(ctx context.Context, orgID, accountID string) (*GettableAccountStatus, error)
ListServices(ctx context.Context, orgID string, accountID *string) (any, error) // returns either GettableAWSServices
GetServiceDetails(ctx context.Context, req *GetServiceDetailsReq) (any, error)
ListConnectedAccounts(ctx context.Context, orgID string) (*GettableConnectedAccountsList, error)
GetDashboard(ctx context.Context, req *GettableDashboard) (*dashboardtypes.Dashboard, error)
GetAvailableDashboards(ctx context.Context, orgID valuer.UUID) ([]*dashboardtypes.Dashboard, error)
UpdateAccountConfig(ctx context.Context, req *PatchableAccountConfig) (any, error) // req can be either PatchableAWSAccountConfig
UpdateServiceConfig(ctx context.Context, req *PatchableServiceConfig) (any, error)
DisconnectAccount(ctx context.Context, orgID, accountID string) (*CloudIntegration, error)
}
type GettableDashboard struct {
ID string
OrgID valuer.UUID
}
type GettableCloudIntegrationConnectionParams struct {
IngestionUrl string `json:"ingestion_url,omitempty"`
IngestionKey string `json:"ingestion_key,omitempty"`
SigNozAPIUrl string `json:"signoz_api_url,omitempty"`
SigNozAPIKey string `json:"signoz_api_key,omitempty"`
}
type GettableIngestionKey struct {
Name string `json:"name"`
Value string `json:"value"`
// other attributes from gateway response not included here since they are not being used.
}
type GettableIngestionKeysSearch struct {
Status string `json:"status"`
Data []GettableIngestionKey `json:"data"`
Error string `json:"error"`
}
type GettableCreateIngestionKey struct {
Status string `json:"status"`
Data GettableIngestionKey `json:"data"`
Error string `json:"error"`
}
type GettableDeployment struct {
Name string `json:"name"`
ClusterInfo struct {
Region struct {
DNS string `json:"dns"`
} `json:"region"`
} `json:"cluster"`
}
type GettableConnectedAccountsList struct {
Accounts []*Account `json:"accounts"`
}
// SigNozAWSAgentConfig represents requirements for agent deployment in user's AWS account
type SigNozAWSAgentConfig struct {
// The region in which SigNoz agent should be installed.
Region string `json:"region"`
IngestionUrl string `json:"ingestion_url"`
IngestionKey string `json:"ingestion_key"`
SigNozAPIUrl string `json:"signoz_api_url"`
SigNozAPIKey string `json:"signoz_api_key"`
Version string `json:"version,omitempty"`
}
type PostableConnectionArtifact struct {
OrgID string
Data []byte // either PostableAWSConnectionUrl
}
type PostableAWSConnectionUrl struct {
// Optional. To be specified for updates.
// TODO: evaluate and remove if not needed.
AccountId *string `json:"account_id,omitempty"`
AccountConfig *AWSAccountConfig `json:"account_config"`
AgentConfig *SigNozAWSAgentConfig `json:"agent_config"`
}
func (p *PostableAWSConnectionUrl) Unmarshal(src any) error {
var data []byte
switch src := src.(type) {
case []byte:
data = src
case string:
data = []byte(src)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
err := json.Unmarshal(data, p)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize aws connection url request from JSON",
)
}
return nil
}
type GettableAWSConnectionUrl struct {
AccountId string `json:"account_id"`
ConnectionUrl string `json:"connection_url"`
}
type GettableAccountStatus struct {
Id string `json:"id"`
CloudAccountId *string `json:"cloud_account_id,omitempty"`
Status AccountStatus `json:"status"`
}
type PostableAgentCheckInPayload struct {
ID string `json:"account_id"`
AccountID string `json:"cloud_account_id"`
// Arbitrary cloud specific Agent data
Data map[string]any `json:"data,omitempty"`
OrgID string `json:"-"`
}
type GettableAWSAgentCheckIn struct {
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`
IntegrationConfig AWSAgentIntegrationConfig `json:"integration_config"`
}
type AWSAgentIntegrationConfig struct {
EnabledRegions []string `json:"enabled_regions"`
TelemetryCollectionStrategy *AWSCollectionStrategy `json:"telemetry,omitempty"`
}
type PatchableServiceConfig struct {
OrgID string `json:"org_id"`
ServiceId string `json:"service_id"`
Config []byte `json:"config"` // json serialized config
}
type PatchableAWSCloudServiceConfig struct {
CloudAccountId string `json:"cloud_account_id"`
Config *AWSCloudServiceConfig `json:"config"`
}
type AWSCloudServiceConfig struct {
Logs *AWSCloudServiceLogsConfig `json:"logs,omitempty"`
Metrics *AWSCloudServiceMetricsConfig `json:"metrics,omitempty"`
}
// Unmarshal unmarshalls data from src
func (c *PatchableAWSCloudServiceConfig) Unmarshal(src []byte) error {
err := json.Unmarshal(src, c)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize aws service config req from JSON",
)
}
return nil
}
// Marshal serializes data to bytes
func (c *PatchableAWSCloudServiceConfig) Marshal() ([]byte, error) {
serialized, err := json.Marshal(c)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize aws service config req to JSON",
)
}
return serialized, nil
}
// Unmarshal unmarshalls data from src
func (a *AWSCloudServiceConfig) Unmarshal(src []byte) error {
err := json.Unmarshal(src, a)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize cloud service config from JSON",
)
}
return nil
}
// Marshal serializes data to bytes
func (a *AWSCloudServiceConfig) Marshal() ([]byte, error) {
serialized, err := json.Marshal(a)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize cloud service config to JSON",
)
}
return serialized, nil
}
func (a *AWSCloudServiceConfig) Validate(def *AWSServiceDefinition) error {
if def.Id != S3Sync && a.Logs != nil && a.Logs.S3Buckets != nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "s3 buckets can only be added to service-type[%s]", S3Sync)
} else if def.Id == S3Sync && a.Logs != nil && a.Logs.S3Buckets != nil {
for region := range a.Logs.S3Buckets {
if _, found := ValidAWSRegions[region]; !found {
return errors.NewInvalidInputf(CodeInvalidCloudRegion, "invalid cloud region: %s", region)
}
}
}
return nil
}
type PatchServiceConfigResponse struct {
ServiceId string `json:"id"`
Config any `json:"config"`
}
type PatchableAccountConfig struct {
OrgID string
AccountId string
Data []byte // can be either AWSAccountConfig
}
type PatchableAWSAccountConfig struct {
Config *AWSAccountConfig `json:"config"`
}
func (p *PatchableAWSAccountConfig) Unmarshal(src []byte) error {
err := json.Unmarshal(src, p)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize patchable account config from JSON",
)
}
return nil
}
func (p *PatchableAWSAccountConfig) Marshal() ([]byte, error) {
if p == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "patchable account config is nil")
}
serialized, err := json.Marshal(p)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize patchable account config to JSON",
)
}
return serialized, nil
}
type AWSAccountConfig struct {
EnabledRegions []string `json:"regions"`
}
// Unmarshal unmarshalls data from src
func (c *AWSAccountConfig) Unmarshal(src []byte) error {
err := json.Unmarshal(src, c)
if err != nil {
return errors.WrapInternalf(
err, errors.CodeInternal, "couldn't deserialize AWS account config from JSON",
)
}
return nil
}
// Marshal serializes data to bytes
func (c *AWSAccountConfig) Marshal() ([]byte, error) {
if c == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "cloud account config is nil")
}
serialized, err := json.Marshal(c)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't serialize cloud account config to JSON")
}
return serialized, nil
}
type GettableAWSServices struct {
Services []AWSServiceSummary `json:"services"`
}
type GetServiceDetailsReq struct {
OrgID valuer.UUID
ServiceId string
CloudAccountID *string
}
// --------------------------------------------------------------------------
// DATABASE TYPES
// --------------------------------------------------------------------------
// --------------------------------------------------------------------------
// Cloud integration uses the cloud_integration table
// and cloud_integrations_service table
// --------------------------------------------------------------------------
type CloudIntegration struct {
bun.BaseModel `bun:"table:cloud_integration"`
types.Identifiable
types.TimeAuditable
Provider string `json:"provider" bun:"provider,type:text,unique:provider_id"`
Config string `json:"config" bun:"config,type:text"` // json serialized config
AccountID *string `json:"account_id" bun:"account_id,type:text"`
LastAgentReport *AgentReport `json:"last_agent_report" bun:"last_agent_report,type:text"`
RemovedAt *time.Time `json:"removed_at" bun:"removed_at,type:timestamp,nullzero"`
OrgID string `bun:"org_id,type:text,unique:provider_id"`
}
func (a *CloudIntegration) Status() AccountStatus {
status := AccountStatus{}
if a.LastAgentReport != nil {
lastHeartbeat := a.LastAgentReport.TimestampMillis
status.Integration.LastHeartbeatTsMillis = &lastHeartbeat
}
return status
}
func (a *CloudIntegration) Account(cloudProvider CloudProviderType) *Account {
ca := &Account{Id: a.ID.StringValue(), Status: a.Status()}
if a.AccountID != nil {
ca.CloudAccountId = *a.AccountID
}
ca.Config = map[string]interface{}{}
if len(a.Config) < 1 {
return ca
}
switch cloudProvider {
case CloudProviderAWS:
config := new(AWSAccountConfig)
_ = config.Unmarshal([]byte(a.Config))
ca.Config = config
default:
}
return ca
}
type Account struct {
Id string `json:"id"`
CloudAccountId string `json:"cloud_account_id"`
Config any `json:"config"` // AWSAccountConfig
Status AccountStatus `json:"status"`
}
type AccountStatus struct {
Integration AccountIntegrationStatus `json:"integration"`
}
type AccountIntegrationStatus struct {
LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"`
}
func DefaultAWSAccountConfig() AWSAccountConfig {
return AWSAccountConfig{
EnabledRegions: []string{},
}
}
type AWSServiceSummary struct {
DefinitionMetadata
Config *AWSCloudServiceConfig `json:"config"`
}
type GettableAWSServiceDetails struct {
AWSServiceDefinition
Config *AWSCloudServiceConfig `json:"config"`
ConnectionStatus *ServiceConnectionStatus `json:"status,omitempty"`
}
type ServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
}
type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}
type AgentReport struct {
TimestampMillis int64 `json:"timestamp_millis"`
Data map[string]any `json:"data"`
}
// Scan scans data from db
func (r *AgentReport) Scan(src any) error {
var data []byte
switch v := src.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
return json.Unmarshal(data, r)
}
// Value serializes data to bytes for db insertion
func (r *AgentReport) Value() (driver.Value, error) {
if r == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "agent report is nil")
}
serialized, err := json.Marshal(r)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize agent report to JSON",
)
}
return string(serialized), nil
}
type CloudIntegrationService struct {
bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"`
types.Identifiable
types.TimeAuditable
Type string `bun:"type,type:text,notnull,unique:cloud_integration_id_type"`
Config string `bun:"config,type:text"` // json serialized config
CloudIntegrationID string `bun:"cloud_integration_id,type:text,notnull,unique:cloud_integration_id_type,references:cloud_integrations(id),on_delete:cascade"`
}
type AWSCloudServiceLogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type AWSCloudServiceMetricsConfig struct {
Enabled bool `json:"enabled"`
}

View File

@@ -1,182 +0,0 @@
package integrationstypes
import (
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
const (
S3Sync = "s3sync"
)
type DefinitionMetadata struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
}
type Definition interface {
GetId() string
Validate() error
PopulateDashboardURLs(svcId string)
}
var _ Definition = &AWSServiceDefinition{}
type AWSServiceDefinition struct {
DefinitionMetadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"data_collected"`
Strategy *AWSCollectionStrategy `json:"telemetry_collection_strategy"`
}
type IngestionStatusCheck struct {
Metrics []*IngestionStatusCheckCategory `json:"metrics"`
Logs []*IngestionStatusCheckCategory `json:"logs"`
}
type IngestionStatusCheckCategory struct {
Category string `json:"category"`
DisplayName string `json:"display_name"`
Checks []*IngestionStatusCheckAttribute `json:"checks"`
}
type IngestionStatusCheckAttribute struct {
Key string `json:"key"` // search key (metric name or log message)
Attributes []*IngestionStatusCheckAttributeFilter `json:"attributes"`
}
type IngestionStatusCheckAttributeFilter struct {
Name string `json:"name"`
Operator string `json:"operator"`
Value string `json:"value"`
}
func (def *AWSServiceDefinition) GetId() string {
return def.Id
}
func (def *AWSServiceDefinition) Validate() error {
seenDashboardIds := map[string]interface{}{}
if def.Strategy == nil {
return errors.NewInternalf(errors.CodeInternal, "telemetry_collection_strategy is required")
}
for _, dd := range def.Assets.Dashboards {
if _, seen := seenDashboardIds[dd.Id]; seen {
return errors.NewInternalf(errors.CodeInternal, "multiple dashboards found with id %s for AWS Integration", dd.Id)
}
seenDashboardIds[dd.Id] = nil
}
return nil
}
func (def *AWSServiceDefinition) PopulateDashboardURLs(serviceId string) {
for i := range def.Assets.Dashboards {
dashboardId := def.Assets.Dashboards[i].Id
url := "/dashboard/" + GetCloudIntegrationDashboardID(CloudProviderAWS, serviceId, dashboardId)
def.Assets.Dashboards[i].Url = url
}
}
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollected struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type AWSCollectionStrategy struct {
Provider valuer.String `json:"provider"`
AWSMetrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsStrategy `json:"aws_logs,omitempty"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type
}
type AWSMetricsStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
StreamFilters []struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
} `json:"cloudwatch_metric_stream_filters"`
}
type AWSLogsStrategy struct {
Subscriptions []struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
} `json:"cloudwatch_logs_subscriptions"`
}
type Dashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *dashboardtypes.StorableDashboardData `json:"definition,omitempty"`
}
func GetCloudIntegrationDashboardID(cloudProvider valuer.String, svcId, dashboardId string) string {
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcId, dashboardId)
}
func GetDashboardsFromAssets(svcId string, cloudProvider CloudProviderType, createdAt *time.Time, assets Assets) []*dashboardtypes.Dashboard {
dashboards := make([]*dashboardtypes.Dashboard, 0)
for _, d := range assets.Dashboards {
author := fmt.Sprintf("%s-integration", cloudProvider)
dashboards = append(dashboards, &dashboardtypes.Dashboard{
ID: GetCloudIntegrationDashboardID(cloudProvider, svcId, d.Id),
Locked: true,
Data: *d.Definition,
TimeAuditable: types.TimeAuditable{
CreatedAt: *createdAt,
UpdatedAt: *createdAt,
},
UserAuditable: types.UserAuditable{
CreatedBy: author,
UpdatedBy: author,
},
})
}
return dashboards
}

View File

@@ -1,106 +0,0 @@
package integrationstypes
import (
"database/sql/driver"
"encoding/json"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
// CloudProviderType type alias
type CloudProviderType = valuer.String
var (
CloudProviderAWS = valuer.NewString("aws")
)
var (
CodeCloudProviderInvalidInput = errors.MustNewCode("invalid_cloud_provider")
)
func NewCloudProvider(provider string) (CloudProviderType, error) {
switch provider {
case CloudProviderAWS.String():
return valuer.NewString(provider), nil
default:
return CloudProviderType{}, errors.NewInvalidInputf(CodeCloudProviderInvalidInput, "invalid cloud provider: %s", provider)
}
}
var (
AWSIntegrationUserEmail = valuer.MustNewEmail("aws-integration@signoz.io")
)
var IntegrationUserEmails = []valuer.Email{
AWSIntegrationUserEmail,
}
func IsCloudIntegrationDashboardUuid(dashboardUuid string) bool {
parts := strings.SplitN(dashboardUuid, "--", 4)
if len(parts) != 4 {
return false
}
return parts[0] == "cloud-integration"
}
func GetCloudProviderFromDashboardID(dashboardUuid string) (CloudProviderType, error) {
parts := strings.SplitN(dashboardUuid, "--", 4)
if len(parts) != 4 {
return valuer.String{}, errors.NewInvalidInputf(CodeCloudProviderInvalidInput, "invalid dashboard uuid: %s", dashboardUuid)
}
providerStr := parts[1]
cloudProvider, err := NewCloudProvider(providerStr)
if err != nil {
return CloudProviderType{}, err
}
return cloudProvider, nil
}
// --------------------------------------------------------------------------
// Normal integration uses just the installed_integration table
// --------------------------------------------------------------------------
type InstalledIntegration struct {
bun.BaseModel `bun:"table:installed_integration"`
types.Identifiable
Type string `json:"type" bun:"type,type:text,unique:org_id_type"`
Config InstalledIntegrationConfig `json:"config" bun:"config,type:text"`
InstalledAt time.Time `json:"installed_at" bun:"installed_at,default:current_timestamp"`
OrgID string `json:"org_id" bun:"org_id,type:text,unique:org_id_type,references:organizations(id),on_delete:cascade"`
}
type InstalledIntegrationConfig map[string]interface{}
// Scan scans data from db
func (c *InstalledIntegrationConfig) Scan(src interface{}) error {
var data []byte
switch v := src.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
return json.Unmarshal(data, c)
}
// Value serializes data to db
func (c *InstalledIntegrationConfig) Value() (driver.Value, error) {
filterSetJson, err := json.Marshal(c)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "could not serialize integration config to JSON")
}
return filterSetJson, nil
}

View File

@@ -20,7 +20,6 @@ pytest_plugins = [
"fixtures.idputils",
"fixtures.notification_channel",
"fixtures.alerts",
"fixtures.cloudintegrations",
]

View File

@@ -1,88 +0,0 @@
"""Fixtures for cloud integration tests."""
from typing import Callable, Optional
from http import HTTPStatus
import pytest
import requests
from fixtures import types
from fixtures.logger import setup_logger
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
logger = setup_logger(__name__)
@pytest.fixture(scope="function")
def create_cloud_integration_account(
request: pytest.FixtureRequest,
signoz: types.SigNoz,
) -> Callable[[str, str], dict]:
created_account_id: Optional[str] = None
cloud_provider_used: Optional[str] = None
def _create(
admin_token: str,
cloud_provider: str = "aws",
) -> dict:
nonlocal created_account_id, cloud_provider_used
endpoint = (
f"/api/v1/cloud-integrations/{cloud_provider}/accounts/generate-connection-url"
)
request_payload = {
"account_config": {"regions": ["us-east-1"]},
"agent_config": {
"region": "us-east-1",
"ingestion_url": "https://ingest.test.signoz.cloud",
"ingestion_key": "test-ingestion-key-123456",
"signoz_api_url": "https://test-deployment.test.signoz.cloud",
"signoz_api_key": "test-api-key-789",
"version": "v0.0.8",
},
}
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=request_payload,
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Failed to create test account: {response.status_code}"
data = response.json().get("data", response.json())
created_account_id = data.get("account_id")
cloud_provider_used = cloud_provider
return data
def _disconnect(admin_token: str, cloud_provider: str) -> requests.Response:
assert created_account_id
disconnect_endpoint = (
f"/api/v1/cloud-integrations/{cloud_provider}/accounts/{created_account_id}/disconnect"
)
return requests.post(
signoz.self.host_configs["8080"].get(disconnect_endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
# Yield factory to the test
yield _create
# Post-test cleanup: generate admin token and disconnect the created account
if created_account_id and cloud_provider_used:
get_token = request.getfixturevalue("get_token")
try:
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
r = _disconnect(admin_token, cloud_provider_used)
if r.status_code != HTTPStatus.OK:
logger.info(
"Disconnect cleanup returned %s for account %s",
r.status_code,
created_account_id,
)
logger.info("Cleaned up test account: %s", created_account_id)
except Exception as exc: # pylint: disable=broad-except
logger.info("Post-test disconnect cleanup failed: %s", exc)

View File

@@ -1,46 +0,0 @@
"""Fixtures for cloud integration tests."""
from http import HTTPStatus
import requests
from fixtures import types
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
def simulate_agent_checkin(
signoz: types.SigNoz,
admin_token: str,
cloud_provider: str,
account_id: str,
cloud_account_id: str,
) -> dict:
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/agent-check-in"
checkin_payload = {
"account_id": account_id,
"cloud_account_id": cloud_account_id,
"data": {},
}
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=checkin_payload,
timeout=10,
)
if response.status_code != HTTPStatus.OK:
logger.error(
"Agent check-in failed: %s, response: %s",
response.status_code,
response.text,
)
assert (
response.status_code == HTTPStatus.OK
), f"Agent check-in failed: {response.status_code}"
response_data = response.json()
return response_data.get("data", response_data)

View File

@@ -1,143 +0,0 @@
from http import HTTPStatus
from typing import Callable
import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logger import setup_logger
logger = setup_logger(__name__)
def test_generate_connection_url(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test to generate connection URL for AWS CloudFormation stack deployment."""
# Get authentication token for admin user
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "aws"
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts/generate-connection-url"
# Prepare request payload
request_payload = {
"account_config": {"regions": ["us-east-1", "us-west-2"]},
"agent_config": {
"region": "us-east-1",
"ingestion_url": "https://ingest.test.signoz.cloud",
"ingestion_key": "test-ingestion-key-123456",
"signoz_api_url": "https://test-deployment.test.signoz.cloud",
"signoz_api_key": "test-api-key-789",
"version": "v0.0.8",
},
}
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=request_payload,
timeout=10,
)
# Assert successful response
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}: {response.text}"
# Parse response JSON
response_data = response.json()
# Assert response structure contains expected data
assert "data" in response_data, "Response should contain 'data' field"
# Assert required fields in the response data
expected_fields = ["account_id", "connection_url"]
for field in expected_fields:
assert (
field in response_data["data"]
), f"Response data should contain '{field}' field"
data = response_data["data"]
# Assert account_id is a valid UUID format
assert (
len(data["account_id"]) > 0
), "account_id should be a non-empty string (UUID)"
# Assert connection_url contains expected CloudFormation parameters
connection_url = data["connection_url"]
# Verify it's an AWS CloudFormation URL
assert (
"console.aws.amazon.com/cloudformation" in connection_url
), "connection_url should be an AWS CloudFormation URL"
# Verify region is included
assert (
"region=us-east-1" in connection_url
), "connection_url should contain the specified region"
# Verify required parameters are in the URL
required_params = [
"param_SigNozIntegrationAgentVersion=v0.0.8",
"param_SigNozApiUrl=https%3A%2F%2Ftest-deployment.test.signoz.cloud",
"param_SigNozApiKey=test-api-key-789",
"param_SigNozAccountId=", # Will be a UUID
"param_IngestionUrl=https%3A%2F%2Fingest.test.signoz.cloud",
"param_IngestionKey=test-ingestion-key-123456",
"stackName=signoz-integration",
"templateURL=https%3A%2F%2Fsignoz-integrations.s3.us-east-1.amazonaws.com%2Faws-quickcreate-template-v0.0.8.json",
]
for param in required_params:
assert (
param in connection_url
), f"connection_url should contain parameter: {param}"
def test_generate_connection_url_unsupported_provider(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test that unsupported cloud providers return an error."""
# Get authentication token for admin user
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Try with GCP (unsupported)
cloud_provider = "gcp"
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts/generate-connection-url"
request_payload = {
"account_config": {"regions": ["us-central1"]},
"agent_config": {
"region": "us-central1",
"ingestion_url": "https://ingest.test.signoz.cloud",
"ingestion_key": "test-ingestion-key-123456",
"signoz_api_url": "https://test-deployment.test.signoz.cloud",
"signoz_api_key": "test-api-key-789",
},
}
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=request_payload,
timeout=10,
)
# Should return Bad Request for unsupported provider
assert (
response.status_code == HTTPStatus.BAD_REQUEST
), f"Expected 400 for unsupported provider, got {response.status_code}"
response_data = response.json()
assert "error" in response_data, "Response should contain 'error' field"
assert (
"invalid cloud provider: gcp" in response_data["error"]["message"]
), "Error message should indicate unsupported provider"

View File

@@ -1,316 +0,0 @@
from http import HTTPStatus
from typing import Callable
import uuid
import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logger import setup_logger
from fixtures.cloudintegrations import (
create_cloud_integration_account,
)
from fixtures.cloudintegrationsutils import simulate_agent_checkin
logger = setup_logger(__name__)
def test_list_connected_accounts_empty(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test listing connected accounts when there are none."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "aws"
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
assert "accounts" in data, "Response should contain 'accounts' field"
assert isinstance(data["accounts"], list), "Accounts should be a list"
assert len(data["accounts"]) == 0, "Accounts list should be empty when no accounts are connected"
def test_list_connected_accounts_with_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test listing connected accounts after creating one."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
# Simulate agent check-in to mark as connected
cloud_account_id = str(uuid.uuid4())
simulate_agent_checkin(signoz, admin_token, cloud_provider, account_id, cloud_account_id)
# List accounts
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
assert "accounts" in data, "Response should contain 'accounts' field"
assert isinstance(data["accounts"], list), "Accounts should be a list"
# Find our account in the list (there may be leftover accounts from previous test runs)
account = next((a for a in data["accounts"] if a["id"] == account_id), None)
assert account is not None, f"Account {account_id} should be found in list"
assert account["id"] == account_id, "Account ID should match"
assert "config" in account, "Account should have config field"
assert "status" in account, "Account should have status field"
def test_get_account_status(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test getting the status of a specific account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account (no check-in needed for status check)
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
# Get account status
endpoint = (
f"/api/v1/cloud-integrations/{cloud_provider}/accounts/{account_id}/status"
)
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
assert "id" in data, "Response should contain 'id' field"
assert data["id"] == account_id, "Account ID should match"
assert "status" in data, "Response should contain 'status' field"
assert "integration" in data["status"], "Status should contain 'integration' field"
def test_get_account_status_not_found(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test getting status for a non-existent account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "aws"
fake_account_id = "00000000-0000-0000-0000-000000000000"
endpoint = (
f"/api/v1/cloud-integrations/{cloud_provider}/accounts/{fake_account_id}/status"
)
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.NOT_FOUND
), f"Expected 404, got {response.status_code}"
def test_update_account_config(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test updating account configuration."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
# Simulate agent check-in to mark as connected
cloud_account_id = str(uuid.uuid4())
simulate_agent_checkin(signoz, admin_token, cloud_provider, account_id, cloud_account_id)
# Update account configuration
endpoint = (
f"/api/v1/cloud-integrations/{cloud_provider}/accounts/{account_id}/config"
)
updated_config = {
"config": {"regions": ["us-east-1", "us-west-2", "eu-west-1"]}
}
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=updated_config,
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
assert "id" in data, "Response should contain 'id' field"
assert data["id"] == account_id, "Account ID should match"
# Verify the update by listing accounts
list_endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts"
list_response = requests.get(
signoz.self.host_configs["8080"].get(list_endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
list_response_data = list_response.json()
list_data = list_response_data.get("data", list_response_data)
account = next((a for a in list_data["accounts"] if a["id"] == account_id), None)
assert account is not None, "Account should be found in list"
assert "config" in account, "Account should have config"
assert "regions" in account["config"], "Config should have regions"
assert len(account["config"]["regions"]) == 3, "Should have 3 regions"
assert set(account["config"]["regions"]) == {
"us-east-1",
"us-west-2",
"eu-west-1",
}, "Regions should match updated config"
def test_disconnect_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test disconnecting an account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
# Simulate agent check-in to mark as connected
cloud_account_id = str(uuid.uuid4())
simulate_agent_checkin(signoz, admin_token, cloud_provider, account_id, cloud_account_id)
# Disconnect the account
endpoint = (
f"/api/v1/cloud-integrations/{cloud_provider}/accounts/{account_id}/disconnect"
)
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
# Verify our specific account is no longer in the connected list
list_endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts"
list_response = requests.get(
signoz.self.host_configs["8080"].get(list_endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
list_response_data = list_response.json()
list_data = list_response_data.get("data", list_response_data)
# Check that our specific account is not in the list
disconnected_account = next(
(a for a in list_data["accounts"] if a["id"] == account_id), None
)
assert disconnected_account is None, f"Account {account_id} should be removed from connected accounts"
def test_disconnect_account_not_found(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test disconnecting a non-existent account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "aws"
fake_account_id = "00000000-0000-0000-0000-000000000000"
endpoint = (
f"/api/v1/cloud-integrations/{cloud_provider}/accounts/{fake_account_id}/disconnect"
)
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.NOT_FOUND
), f"Expected 404, got {response.status_code}"
def test_list_accounts_unsupported_provider(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test listing accounts for an unsupported cloud provider."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "gcp" # Unsupported provider
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/accounts"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.BAD_REQUEST
), f"Expected 400, got {response.status_code}"

View File

@@ -1,467 +0,0 @@
from http import HTTPStatus
from typing import Callable
import uuid
import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logger import setup_logger
from fixtures.cloudintegrations import (
create_cloud_integration_account,
)
from fixtures.cloudintegrationsutils import simulate_agent_checkin
logger = setup_logger(__name__)
def test_list_services_without_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test listing available services without specifying an account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "aws"
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
assert "services" in data, "Response should contain 'services' field"
assert isinstance(data["services"], list), "Services should be a list"
assert len(data["services"]) > 0, "Should have at least one service available"
# Verify service structure
service = data["services"][0]
assert "id" in service, "Service should have 'id' field"
assert "title" in service, "Service should have 'title' field"
assert "icon" in service, "Service should have 'icon' field"
def test_list_services_with_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test listing services for a specific connected account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
cloud_account_id = str(uuid.uuid4())
simulate_agent_checkin(signoz, admin_token, cloud_provider, account_id, cloud_account_id)
# List services for the account
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services?cloud_account_id={cloud_account_id}"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
assert "services" in data, "Response should contain 'services' field"
assert isinstance(data["services"], list), "Services should be a list"
assert len(data["services"]) > 0, "Should have at least one service available"
# Services should include config field (may be null if not configured)
service = data["services"][0]
assert "id" in service, "Service should have 'id' field"
assert "title" in service, "Service should have 'title' field"
assert "icon" in service, "Service should have 'icon' field"
def test_get_service_details_without_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test getting service details without specifying an account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "aws"
# First get the list of services to get a valid service ID
list_endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
list_response = requests.get(
signoz.self.host_configs["8080"].get(list_endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
list_data = list_response.json().get("data", list_response.json())
assert len(list_data["services"]) > 0, "Should have at least one service"
service_id = list_data["services"][0]["id"]
# Get service details
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services/{service_id}"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
# Verify service details structure
assert "id" in data, "Service details should have 'id' field"
assert data["id"] == service_id, "Service ID should match requested ID"
assert "title" in data, "Service details should have 'title' field"
assert "overview" in data, "Service details should have 'overview' field"
# assert assets to had list of dashboards
assert "assets" in data, "Service details should have 'assets' field"
assert isinstance(data["assets"], dict), "Assets should be a dictionary"
def test_get_service_details_with_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test getting service details for a specific connected account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
cloud_account_id = str(uuid.uuid4())
simulate_agent_checkin(signoz, admin_token, cloud_provider, account_id, cloud_account_id)
# Get list of services first
list_endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
list_response = requests.get(
signoz.self.host_configs["8080"].get(list_endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
list_data = list_response.json().get("data", list_response.json())
assert len(list_data["services"]) > 0, "Should have at least one service"
service_id = list_data["services"][0]["id"]
# Get service details with account
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services/{service_id}?cloud_account_id={cloud_account_id}"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
# Verify service details structure
assert "id" in data, "Service details should have 'id' field"
assert data["id"] == service_id, "Service ID should match requested ID"
assert "title" in data, "Service details should have 'title' field"
assert "overview" in data, "Service details should have 'overview' field"
assert "assets" in data, "Service details should have 'assets' field"
assert "config" in data, "Service details should have 'config' field"
def test_get_service_details_invalid_service(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test getting details for a non-existent service."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "aws"
fake_service_id = "non-existent-service"
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services/{fake_service_id}"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.NOT_FOUND
), f"Expected 404, got {response.status_code}"
def test_list_services_unsupported_provider(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test listing services for an unsupported cloud provider."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "gcp" # Unsupported provider
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
response = requests.get(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
assert (
response.status_code == HTTPStatus.BAD_REQUEST
), f"Expected 400, got {response.status_code}"
def test_update_service_config(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test updating service configuration for a connected account."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
cloud_account_id = str(uuid.uuid4())
simulate_agent_checkin(signoz, admin_token, cloud_provider, account_id, cloud_account_id)
# Get list of services to pick a valid service ID
list_endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
list_response = requests.get(
signoz.self.host_configs["8080"].get(list_endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
list_data = list_response.json().get("data", list_response.json())
assert len(list_data["services"]) > 0, "Should have at least one service"
service_id = list_data["services"][0]["id"]
# Update service configuration
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services/{service_id}/config"
config_payload = {
"cloud_account_id": cloud_account_id,
"config": {
"metrics": {"enabled": True},
"logs": {"enabled": True},
},
}
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=config_payload,
timeout=10,
)
assert (
response.status_code == HTTPStatus.OK
), f"Expected 200, got {response.status_code}"
response_data = response.json()
data = response_data.get("data", response_data)
# Verify response structure
assert "id" in data, "Response should contain 'id' field"
assert data["id"] == service_id, "Service ID should match"
assert "config" in data, "Response should contain 'config' field"
assert "metrics" in data["config"], "Config should contain 'metrics' field"
assert "logs" in data["config"], "Config should contain 'logs' field"
def test_update_service_config_without_account(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
) -> None:
"""Test updating service config without a connected account should fail."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
cloud_provider = "aws"
# Get a valid service ID
list_endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
list_response = requests.get(
signoz.self.host_configs["8080"].get(list_endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
list_data = list_response.json().get("data", list_response.json())
service_id = list_data["services"][0]["id"]
# Try to update config with non-existent account
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services/{service_id}/config"
fake_cloud_account_id = str(uuid.uuid4())
config_payload = {
"cloud_account_id": fake_cloud_account_id,
"config": {
"metrics": {"enabled": True},
},
}
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=config_payload,
timeout=10,
)
assert (
response.status_code == HTTPStatus.NOT_FOUND
), f"Expected 400 for non-existent account, got {response.status_code}"
def test_update_service_config_invalid_service(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test updating config for a non-existent service should fail."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
cloud_account_id = str(uuid.uuid4())
simulate_agent_checkin(signoz, admin_token, cloud_provider, account_id, cloud_account_id)
# Try to update config for invalid service
fake_service_id = "non-existent-service"
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services/{fake_service_id}/config"
config_payload = {
"cloud_account_id": cloud_account_id,
"config": {
"metrics": {"enabled": True},
},
}
response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=config_payload,
timeout=10,
)
assert (
response.status_code == HTTPStatus.NOT_FOUND
), f"Expected 404 for invalid service, got {response.status_code}"
def test_update_service_config_disable_service(
signoz: types.SigNoz,
create_user_admin: types.Operation, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
create_cloud_integration_account: Callable,
) -> None:
"""Test disabling a service by updating config with enabled=false."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Create a test account and do check-in
cloud_provider = "aws"
account_data = create_cloud_integration_account(admin_token, cloud_provider)
account_id = account_data["account_id"]
cloud_account_id = str(uuid.uuid4())
simulate_agent_checkin(signoz, admin_token, cloud_provider, account_id, cloud_account_id)
# Get a valid service
list_endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services"
list_response = requests.get(
signoz.self.host_configs["8080"].get(list_endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=10,
)
list_data = list_response.json().get("data", list_response.json())
service_id = list_data["services"][0]["id"]
# First enable the service
endpoint = f"/api/v1/cloud-integrations/{cloud_provider}/services/{service_id}/config"
enable_payload = {
"cloud_account_id": cloud_account_id,
"config": {
"metrics": {"enabled": True},
},
}
enable_response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=enable_payload,
timeout=10,
)
assert enable_response.status_code == HTTPStatus.OK, "Failed to enable service"
# Now disable the service
disable_payload = {
"cloud_account_id": cloud_account_id,
"config": {
"metrics": {"enabled": False},
"logs": {"enabled": False},
},
}
disable_response = requests.post(
signoz.self.host_configs["8080"].get(endpoint),
headers={"Authorization": f"Bearer {admin_token}"},
json=disable_payload,
timeout=10,
)
assert (
disable_response.status_code == HTTPStatus.OK
), f"Expected 200, got {disable_response.status_code}"
response_data = disable_response.json()
data = response_data.get("data", response_data)
# Verify service is disabled
assert data["config"]["metrics"]["enabled"] is False, "Metrics should be disabled"
assert data["config"]["logs"]["enabled"] is False, "Logs should be disabled"