Compare commits

..

2 Commits

Author SHA1 Message Date
srikanthccv
a8b28edaf2 chore: fix lint 2026-02-24 23:41:04 +05:30
srikanthccv
fdbbbaa43b chore: move converter/formatter to pkg/... 2026-02-24 23:24:46 +05:30
117 changed files with 5032 additions and 7820 deletions

View File

@@ -190,7 +190,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.113.0
image: signoz/signoz:v0.112.1
ports:
- "8080:8080" # signoz port
# - "6060:6060" # pprof port
@@ -213,7 +213,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.144.1
image: signoz/signoz-otel-collector:v0.142.1
entrypoint:
- /bin/sh
command:
@@ -241,7 +241,7 @@ services:
replicas: 3
signoz-telemetrystore-migrator:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.144.1
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.142.0}
environment:
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_DSN=tcp://clickhouse:9000
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_CLUSTER=cluster

View File

@@ -117,7 +117,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.113.0
image: signoz/signoz:v0.112.1
ports:
- "8080:8080" # signoz port
volumes:
@@ -139,7 +139,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.144.1
image: signoz/signoz-otel-collector:v0.142.1
entrypoint:
- /bin/sh
command:
@@ -167,7 +167,7 @@ services:
replicas: 3
signoz-telemetrystore-migrator:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.144.1
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.142.0}
environment:
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_DSN=tcp://clickhouse:9000
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_CLUSTER=cluster

View File

@@ -82,12 +82,6 @@ exporters:
timeout: 45s
sending_queue:
enabled: false
metadataexporter:
cache:
provider: in_memory
dsn: tcp://clickhouse:9000/signoz_metadata
enabled: true
timeout: 45s
service:
telemetry:
logs:
@@ -99,19 +93,19 @@ service:
traces:
receivers: [otlp]
processors: [signozspanmetrics/delta, batch]
exporters: [clickhousetraces, metadataexporter, signozmeter]
exporters: [clickhousetraces, signozmeter]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
exporters: [signozclickhousemetrics, signozmeter]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
exporters: [signozclickhousemetrics, signozmeter]
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouselogsexporter, metadataexporter, signozmeter]
exporters: [clickhouselogsexporter, signozmeter]
metrics/meter:
receivers: [signozmeter]
processors: [batch/meter]

View File

@@ -181,7 +181,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.113.0}
image: signoz/signoz:${VERSION:-v0.112.1}
container_name: signoz
ports:
- "8080:8080" # signoz port
@@ -204,7 +204,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.1}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.142.1}
container_name: signoz-otel-collector
entrypoint:
- /bin/sh
@@ -229,7 +229,7 @@ services:
- "4318:4318" # OTLP HTTP receiver
signoz-telemetrystore-migrator:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.1}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.142.0}
container_name: signoz-telemetrystore-migrator
environment:
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_DSN=tcp://clickhouse:9000

View File

@@ -109,7 +109,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.113.0}
image: signoz/signoz:${VERSION:-v0.112.1}
container_name: signoz
ports:
- "8080:8080" # signoz port
@@ -132,7 +132,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.1}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.142.1}
container_name: signoz-otel-collector
entrypoint:
- /bin/sh
@@ -157,7 +157,7 @@ services:
- "4318:4318" # OTLP HTTP receiver
signoz-telemetrystore-migrator:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.1}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.142.0}
container_name: signoz-telemetrystore-migrator
environment:
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_DSN=tcp://clickhouse:9000

View File

@@ -82,12 +82,6 @@ exporters:
timeout: 45s
sending_queue:
enabled: false
metadataexporter:
cache:
provider: in_memory
dsn: tcp://clickhouse:9000/signoz_metadata
enabled: true
timeout: 45s
service:
telemetry:
logs:
@@ -99,19 +93,19 @@ service:
traces:
receivers: [otlp]
processors: [signozspanmetrics/delta, batch]
exporters: [clickhousetraces, metadataexporter, signozmeter]
exporters: [clickhousetraces, signozmeter]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
exporters: [signozclickhousemetrics, signozmeter]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
exporters: [signozclickhousemetrics, signozmeter]
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouselogsexporter, metadataexporter, signozmeter]
exporters: [clickhouselogsexporter, signozmeter]
metrics/meter:
receivers: [signozmeter]
processors: [batch/meter]

View File

@@ -1,7 +1,6 @@
package api
import (
"log/slog"
"net/http"
"time"
@@ -11,6 +10,7 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/http/middleware"
baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
@@ -27,12 +27,12 @@ type APIHandlerOptions struct {
RulesManager *rules.Manager
UsageManager *usage.Manager
IntegrationsController *integrations.Controller
CloudIntegrationsController *cloudintegrations.Controller
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
GatewayUrl string
// Querier Influx Interval
FluxInterval time.Duration
GlobalConfig global.Config
Logger *slog.Logger // this is present in Signoz.Instrumentation but adding for quick access
}
type APIHandler struct {
@@ -46,13 +46,13 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz, config signoz.
Reader: opts.DataConnector,
RuleManager: opts.RulesManager,
IntegrationsController: opts.IntegrationsController,
CloudIntegrationsController: opts.CloudIntegrationsController,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
FluxInterval: opts.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
LicensingAPI: httplicensing.NewLicensingAPI(signoz.Licensing),
Signoz: signoz,
QueryParserAPI: queryparser.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.QueryParser),
Logger: opts.Logger,
}, config)
if err != nil {
@@ -101,12 +101,14 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
}
func (ah *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *middleware.AuthZ) {
ah.APIHandler.RegisterCloudIntegrationsRoutes(router, am)
router.HandleFunc(
"/api/v1/cloud-integrations/{cloudProvider}/accounts/generate-connection-params",
am.EditAccess(ah.CloudIntegrationsGenerateConnectionParams),
).Methods(http.MethodGet)
}
func (ah *APIHandler) getVersion(w http.ResponseWriter, r *http.Request) {

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
@@ -14,14 +13,20 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/user"
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
"go.uber.org/zap"
)
// TODO: move this file with other cloud integration related code
type CloudIntegrationConnectionParamsResponse struct {
IngestionUrl string `json:"ingestion_url,omitempty"`
IngestionKey string `json:"ingestion_key,omitempty"`
SigNozAPIUrl string `json:"signoz_api_url,omitempty"`
SigNozAPIKey string `json:"signoz_api_key,omitempty"`
}
func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
@@ -36,21 +41,23 @@ func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseW
return
}
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
cloudProvider := mux.Vars(r)["cloudProvider"]
if cloudProvider != "aws" {
RespondError(w, basemodel.BadRequest(fmt.Errorf(
"cloud provider not supported: %s", cloudProvider,
)), nil)
return
}
apiKey, err := ah.getOrCreateCloudIntegrationPAT(r.Context(), claims.OrgID, cloudProvider)
if err != nil {
render.Error(w, err)
apiKey, apiErr := ah.getOrCreateCloudIntegrationPAT(r.Context(), claims.OrgID, cloudProvider)
if apiErr != nil {
RespondError(w, basemodel.WrapApiError(
apiErr, "couldn't provision PAT for cloud integration:",
), nil)
return
}
result := integrationtypes.GettableCloudIntegrationConnectionParams{
result := CloudIntegrationConnectionParamsResponse{
SigNozAPIKey: apiKey,
}
@@ -64,17 +71,16 @@ func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseW
// Return the API Key (PAT) even if the rest of the params can not be deduced.
// Params not returned from here will be requested from the user via form inputs.
// This enables gracefully degraded but working experience even for non-cloud deployments.
ah.opts.Logger.InfoContext(
r.Context(),
"ingestion params and signoz api url can not be deduced since no license was found",
)
render.Success(w, http.StatusOK, result)
zap.L().Info("ingestion params and signoz api url can not be deduced since no license was found")
ah.Respond(w, result)
return
}
signozApiUrl, err := ah.getIngestionUrlAndSigNozAPIUrl(r.Context(), license.Key)
if err != nil {
render.Error(w, err)
signozApiUrl, apiErr := ah.getIngestionUrlAndSigNozAPIUrl(r.Context(), license.Key)
if apiErr != nil {
RespondError(w, basemodel.WrapApiError(
apiErr, "couldn't deduce ingestion url and signoz api url",
), nil)
return
}
@@ -83,41 +89,48 @@ func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseW
gatewayUrl := ah.opts.GatewayUrl
if len(gatewayUrl) > 0 {
ingestionKeyString, err := ah.getOrCreateCloudProviderIngestionKey(
ingestionKey, apiErr := getOrCreateCloudProviderIngestionKey(
r.Context(), gatewayUrl, license.Key, cloudProvider,
)
if err != nil {
render.Error(w, err)
if apiErr != nil {
RespondError(w, basemodel.WrapApiError(
apiErr, "couldn't get or create ingestion key",
), nil)
return
}
result.IngestionKey = ingestionKeyString
result.IngestionKey = ingestionKey
} else {
ah.opts.Logger.InfoContext(
r.Context(),
"ingestion key can't be deduced since no gateway url has been configured",
)
zap.L().Info("ingestion key can't be deduced since no gateway url has been configured")
}
render.Success(w, http.StatusOK, result)
ah.Respond(w, result)
}
func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId string, cloudProvider valuer.String) (string, error) {
func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId string, cloudProvider string) (
string, *basemodel.ApiError,
) {
integrationPATName := fmt.Sprintf("%s integration", cloudProvider)
integrationUser, err := ah.getOrCreateCloudIntegrationUser(ctx, orgId, cloudProvider)
if err != nil {
return "", err
integrationUser, apiErr := ah.getOrCreateCloudIntegrationUser(ctx, orgId, cloudProvider)
if apiErr != nil {
return "", apiErr
}
orgIdUUID, err := valuer.NewUUID(orgId)
if err != nil {
return "", err
return "", basemodel.InternalError(fmt.Errorf(
"couldn't parse orgId: %w", err,
))
}
allPats, err := ah.Signoz.Modules.User.ListAPIKeys(ctx, orgIdUUID)
if err != nil {
return "", err
return "", basemodel.InternalError(fmt.Errorf(
"couldn't list PATs: %w", err,
))
}
for _, p := range allPats {
if p.UserID == integrationUser.ID && p.Name == integrationPATName {
@@ -125,10 +138,9 @@ func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId
}
}
ah.opts.Logger.InfoContext(
ctx,
zap.L().Info(
"no PAT found for cloud integration, creating a new one",
slog.String("cloudProvider", cloudProvider.String()),
zap.String("cloudProvider", cloudProvider),
)
newPAT, err := types.NewStorableAPIKey(
@@ -138,48 +150,68 @@ func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId
0,
)
if err != nil {
return "", err
return "", basemodel.InternalError(fmt.Errorf(
"couldn't create cloud integration PAT: %w", err,
))
}
err = ah.Signoz.Modules.User.CreateAPIKey(ctx, newPAT)
if err != nil {
return "", err
return "", basemodel.InternalError(fmt.Errorf(
"couldn't create cloud integration PAT: %w", err,
))
}
return newPAT.Token, nil
}
// TODO: move this function out of handler and use proper module structure
func (ah *APIHandler) getOrCreateCloudIntegrationUser(ctx context.Context, orgId string, cloudProvider valuer.String) (*types.User, error) {
cloudIntegrationUserName := fmt.Sprintf("%s-integration", cloudProvider.String())
func (ah *APIHandler) getOrCreateCloudIntegrationUser(
ctx context.Context, orgId string, cloudProvider string,
) (*types.User, *basemodel.ApiError) {
cloudIntegrationUserName := fmt.Sprintf("%s-integration", cloudProvider)
email := valuer.MustNewEmail(fmt.Sprintf("%s@signoz.io", cloudIntegrationUserName))
cloudIntegrationUser, err := types.NewUser(cloudIntegrationUserName, email, types.RoleViewer, valuer.MustNewUUID(orgId))
if err != nil {
return nil, err
return nil, basemodel.InternalError(fmt.Errorf("couldn't create cloud integration user: %w", err))
}
password := types.MustGenerateFactorPassword(cloudIntegrationUser.ID.StringValue())
cloudIntegrationUser, err = ah.Signoz.Modules.User.GetOrCreateUser(ctx, cloudIntegrationUser, user.WithFactorPassword(password))
if err != nil {
return nil, err
return nil, basemodel.InternalError(fmt.Errorf("couldn't look for integration user: %w", err))
}
return cloudIntegrationUser, nil
}
// TODO: move this function out of handler and use proper module structure
func (ah *APIHandler) getIngestionUrlAndSigNozAPIUrl(ctx context.Context, licenseKey string) (string, error) {
respBytes, err := ah.Signoz.Zeus.GetDeployment(ctx, licenseKey)
if err != nil {
return "", errors.WrapInternalf(err, errors.CodeInternal, "couldn't query for deployment info: error")
func (ah *APIHandler) getIngestionUrlAndSigNozAPIUrl(ctx context.Context, licenseKey string) (
string, *basemodel.ApiError,
) {
// TODO: remove this struct from here
type deploymentResponse struct {
Name string `json:"name"`
ClusterInfo struct {
Region struct {
DNS string `json:"dns"`
} `json:"region"`
} `json:"cluster"`
}
resp := new(integrationtypes.GettableDeployment)
respBytes, err := ah.Signoz.Zeus.GetDeployment(ctx, licenseKey)
if err != nil {
return "", basemodel.InternalError(fmt.Errorf(
"couldn't query for deployment info: error: %w", err,
))
}
resp := new(deploymentResponse)
err = json.Unmarshal(respBytes, resp)
if err != nil {
return "", errors.WrapInternalf(err, errors.CodeInternal, "couldn't unmarshal deployment info response")
return "", basemodel.InternalError(fmt.Errorf(
"couldn't unmarshal deployment info response: error: %w", err,
))
}
regionDns := resp.ClusterInfo.Region.DNS
@@ -187,10 +219,9 @@ func (ah *APIHandler) getIngestionUrlAndSigNozAPIUrl(ctx context.Context, licens
if len(regionDns) < 1 || len(deploymentName) < 1 {
// Fail early if actual response structure and expectation here ever diverge
return "", errors.NewInternalf(
errors.CodeInternal,
return "", basemodel.InternalError(fmt.Errorf(
"deployment info response not in expected shape. couldn't determine region dns and deployment name",
)
))
}
signozApiUrl := fmt.Sprintf("https://%s.%s", deploymentName, regionDns)
@@ -198,85 +229,102 @@ func (ah *APIHandler) getIngestionUrlAndSigNozAPIUrl(ctx context.Context, licens
return signozApiUrl, nil
}
func (ah *APIHandler) getOrCreateCloudProviderIngestionKey(
ctx context.Context, gatewayUrl string, licenseKey string, cloudProvider valuer.String,
) (string, error) {
type ingestionKey struct {
Name string `json:"name"`
Value string `json:"value"`
// other attributes from gateway response not included here since they are not being used.
}
type ingestionKeysSearchResponse struct {
Status string `json:"status"`
Data []ingestionKey `json:"data"`
Error string `json:"error"`
}
type createIngestionKeyResponse struct {
Status string `json:"status"`
Data ingestionKey `json:"data"`
Error string `json:"error"`
}
func getOrCreateCloudProviderIngestionKey(
ctx context.Context, gatewayUrl string, licenseKey string, cloudProvider string,
) (string, *basemodel.ApiError) {
cloudProviderKeyName := fmt.Sprintf("%s-integration", cloudProvider)
// see if the key already exists
searchResult, err := requestGateway[integrationtypes.GettableIngestionKeysSearch](
searchResult, apiErr := requestGateway[ingestionKeysSearchResponse](
ctx,
gatewayUrl,
licenseKey,
fmt.Sprintf("/v1/workspaces/me/keys/search?name=%s", cloudProviderKeyName),
nil,
ah.opts.Logger,
)
if err != nil {
return "", err
if apiErr != nil {
return "", basemodel.WrapApiError(
apiErr, "couldn't search for cloudprovider ingestion key",
)
}
if searchResult.Status != "success" {
return "", errors.NewInternalf(
errors.CodeInternal,
"couldn't search for cloud provider ingestion key: status: %s, error: %s",
return "", basemodel.InternalError(fmt.Errorf(
"couldn't search for cloudprovider ingestion key: status: %s, error: %s",
searchResult.Status, searchResult.Error,
)
))
}
for _, k := range searchResult.Data {
if k.Name != cloudProviderKeyName {
continue
}
if k.Name == cloudProviderKeyName {
if len(k.Value) < 1 {
// Fail early if actual response structure and expectation here ever diverge
return "", basemodel.InternalError(fmt.Errorf(
"ingestion keys search response not as expected",
))
}
if len(k.Value) < 1 {
// Fail early if actual response structure and expectation here ever diverge
return "", errors.NewInternalf(errors.CodeInternal, "ingestion keys search response not as expected")
return k.Value, nil
}
return k.Value, nil
}
ah.opts.Logger.InfoContext(
ctx,
zap.L().Info(
"no existing ingestion key found for cloud integration, creating a new one",
slog.String("cloudProvider", cloudProvider.String()),
zap.String("cloudProvider", cloudProvider),
)
createKeyResult, err := requestGateway[integrationtypes.GettableCreateIngestionKey](
createKeyResult, apiErr := requestGateway[createIngestionKeyResponse](
ctx, gatewayUrl, licenseKey, "/v1/workspaces/me/keys",
map[string]any{
"name": cloudProviderKeyName,
"tags": []string{"integration", cloudProvider.String()},
"tags": []string{"integration", cloudProvider},
},
ah.opts.Logger,
)
if err != nil {
return "", err
if apiErr != nil {
return "", basemodel.WrapApiError(
apiErr, "couldn't create cloudprovider ingestion key",
)
}
if createKeyResult.Status != "success" {
return "", errors.NewInternalf(
errors.CodeInternal,
"couldn't create cloud provider ingestion key: status: %s, error: %s",
return "", basemodel.InternalError(fmt.Errorf(
"couldn't create cloudprovider ingestion key: status: %s, error: %s",
createKeyResult.Status, createKeyResult.Error,
)
))
}
ingestionKeyString := createKeyResult.Data.Value
if len(ingestionKeyString) < 1 {
ingestionKey := createKeyResult.Data.Value
if len(ingestionKey) < 1 {
// Fail early if actual response structure and expectation here ever diverge
return "", errors.NewInternalf(errors.CodeInternal,
return "", basemodel.InternalError(fmt.Errorf(
"ingestion key creation response not as expected",
)
))
}
return ingestionKeyString, nil
return ingestionKey, nil
}
func requestGateway[ResponseType any](
ctx context.Context, gatewayUrl, licenseKey, path string, payload any, logger *slog.Logger,
) (*ResponseType, error) {
ctx context.Context, gatewayUrl string, licenseKey string, path string, payload any,
) (*ResponseType, *basemodel.ApiError) {
baseUrl := strings.TrimSuffix(gatewayUrl, "/")
reqUrl := fmt.Sprintf("%s%s", baseUrl, path)
@@ -287,12 +335,13 @@ func requestGateway[ResponseType any](
"X-Consumer-Groups": "ns:default",
}
return requestAndParseResponse[ResponseType](ctx, reqUrl, headers, payload, logger)
return requestAndParseResponse[ResponseType](ctx, reqUrl, headers, payload)
}
func requestAndParseResponse[ResponseType any](
ctx context.Context, url string, headers map[string]string, payload any, logger *slog.Logger,
) (*ResponseType, error) {
ctx context.Context, url string, headers map[string]string, payload any,
) (*ResponseType, *basemodel.ApiError) {
reqMethod := http.MethodGet
var reqBody io.Reader
if payload != nil {
@@ -300,14 +349,18 @@ func requestAndParseResponse[ResponseType any](
bodyJson, err := json.Marshal(payload)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't marshal payload")
return nil, basemodel.InternalError(fmt.Errorf(
"couldn't serialize request payload to JSON: %w", err,
))
}
reqBody = bytes.NewBuffer(bodyJson)
reqBody = bytes.NewBuffer([]byte(bodyJson))
}
req, err := http.NewRequestWithContext(ctx, reqMethod, url, reqBody)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't create req")
return nil, basemodel.InternalError(fmt.Errorf(
"couldn't prepare request: %w", err,
))
}
for k, v := range headers {
@@ -320,26 +373,23 @@ func requestAndParseResponse[ResponseType any](
response, err := client.Do(req)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't make req")
return nil, basemodel.InternalError(fmt.Errorf("couldn't make request: %w", err))
}
defer func() {
err = response.Body.Close()
if err != nil {
logger.ErrorContext(ctx, "couldn't close response body", "error", err)
}
}()
defer response.Body.Close()
respBody, err := io.ReadAll(response.Body)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't read response body")
return nil, basemodel.InternalError(fmt.Errorf("couldn't read response: %w", err))
}
var resp ResponseType
err = json.Unmarshal(respBody, &resp)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't unmarshal response body")
return nil, basemodel.InternalError(fmt.Errorf(
"couldn't unmarshal gateway response into %T", resp,
))
}
return &resp, nil

View File

@@ -37,6 +37,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp"
@@ -120,6 +121,13 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
)
}
cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SQLStore)
if err != nil {
return nil, fmt.Errorf(
"couldn't create cloud provider integrations controller: %w", err,
)
}
// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
signoz.SQLStore,
@@ -153,11 +161,11 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
RulesManager: rm,
UsageManager: usageManager,
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
FluxInterval: config.Querier.FluxInterval,
GatewayUrl: config.Gateway.URL.String(),
GlobalConfig: config.Global,
Logger: signoz.Instrumentation.Logger(),
}
apiHandler, err := api.NewAPIHandler(apiOpts, signoz, config)

View File

@@ -26,7 +26,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
"github.com/SigNoz/signoz/pkg/query-service/formatter"
"github.com/SigNoz/signoz/pkg/formatter"
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"

View File

@@ -86,13 +86,8 @@ function LogDetailInner({
const handleClickOutside = (e: MouseEvent): void => {
const target = e.target as HTMLElement;
// Don't close if clicking on drawer content, overlays, or portal elements
if (
target.closest('[data-log-detail-ignore="true"]') ||
target.closest('.cm-tooltip-autocomplete') ||
target.closest('.drawer-popover') ||
target.closest('.query-status-popover')
) {
// Don't close if clicking on explicitly ignored regions
if (target.closest('[data-log-detail-ignore="true"]')) {
return;
}
@@ -405,11 +400,7 @@ function LogDetailInner({
<div className="log-detail-drawer__content" data-log-detail-ignore="true">
<div className="log-detail-drawer__log">
<Divider type="vertical" className={cx('log-type-indicator', logType)} />
<Tooltip
title={removeEscapeCharacters(log?.body)}
placement="left"
mouseLeaveDelay={0}
>
<Tooltip title={removeEscapeCharacters(log?.body)} placement="left">
<div className="log-body" dangerouslySetInnerHTML={htmlBody} />
</Tooltip>
@@ -475,7 +466,6 @@ function LogDetailInner({
title="Show Filters"
placement="topLeft"
aria-label="Show Filters"
mouseLeaveDelay={0}
>
<Button
className="action-btn"
@@ -491,7 +481,6 @@ function LogDetailInner({
aria-label={
selectedView === VIEW_TYPES.JSON ? 'Copy JSON' : 'Copy Log Link'
}
mouseLeaveDelay={0}
>
<Button
className="action-btn"

View File

@@ -27,11 +27,7 @@ function AddToQueryHOC({
return (
// eslint-disable-next-line jsx-a11y/click-events-have-key-events, jsx-a11y/no-static-element-interactions
<div className={cx('addToQueryContainer', fontSize)} onClick={handleQueryAdd}>
<Popover
overlayClassName="drawer-popover"
placement="top"
content={popOverContent}
>
<Popover placement="top" content={popOverContent}>
{children}
</Popover>
</div>

View File

@@ -32,7 +32,6 @@ function CopyClipboardHOC({
<span onClick={onClick} role="presentation" tabIndex={-1}>
<Popover
placement="top"
overlayClassName="drawer-popover"
content={<span style={{ fontSize: '0.9rem' }}>{tooltipText}</span>}
>
{children}

View File

@@ -21,7 +21,7 @@ export function getDefaultCellStyle(isDarkMode?: boolean): CSSProperties {
export const defaultTableStyle: CSSProperties = {
minWidth: '40rem',
maxWidth: '90rem',
maxWidth: '60rem',
};
export const defaultListViewPanelStyle: CSSProperties = {

View File

@@ -1328,10 +1328,7 @@ function QuerySearch({
)}
<div className="query-where-clause-editor-container">
<Tooltip
title={<div data-log-detail-ignore="true">{getTooltipContent()}</div>}
placement="left"
>
<Tooltip title={getTooltipContent()} placement="left">
<a
href="https://signoz.io/docs/userguide/search-syntax/"
target="_blank"

View File

@@ -9,7 +9,6 @@ import {
import useVariablesFromUrl from 'hooks/dashboard/useVariablesFromUrl';
import { useDashboard } from 'providers/Dashboard/Dashboard';
import { initializeDefaultVariables } from 'providers/Dashboard/initializeDefaultVariables';
import { updateDashboardVariablesStore } from 'providers/Dashboard/store/dashboardVariables/dashboardVariablesStore';
import {
enqueueDescendantsOfVariable,
enqueueFetchOfAllVariables,
@@ -32,9 +31,6 @@ function DashboardVariableSelection(): JSX.Element | null {
const { updateUrlVariable, getUrlVariables } = useVariablesFromUrl();
const { dashboardVariables } = useDashboardVariables();
const dashboardId = useDashboardVariablesSelector(
(state) => state.dashboardId,
);
const sortedVariablesArray = useDashboardVariablesSelector(
(state) => state.sortedVariablesArray,
);
@@ -100,28 +96,6 @@ function DashboardVariableSelection(): JSX.Element | null {
updateUrlVariable(name || id, value);
}
// Synchronously update the external store with the new variable value so that
// child variables see the updated parent value when they refetch, rather than
// waiting for setSelectedDashboard → useEffect → updateDashboardVariablesStore.
const updatedVariables = { ...dashboardVariables };
if (updatedVariables[id]) {
updatedVariables[id] = {
...updatedVariables[id],
selectedValue: value,
allSelected,
haveCustomValuesSelected,
};
}
if (updatedVariables[name]) {
updatedVariables[name] = {
...updatedVariables[name],
selectedValue: value,
allSelected,
haveCustomValuesSelected,
};
}
updateDashboardVariablesStore({ dashboardId, variables: updatedVariables });
setSelectedDashboard((prev) => {
if (prev) {
const oldVariables = { ...prev?.data.variables };
@@ -156,12 +130,10 @@ function DashboardVariableSelection(): JSX.Element | null {
return prev;
});
// Cascade: enqueue query-type descendants for refetching.
// Safe to call synchronously now that the store already has the updated value.
// Cascade: enqueue query-type descendants for refetching
enqueueDescendantsOfVariable(name);
},
[
dashboardId,
dashboardVariables,
updateLocalStorageDashboardVariables,
updateUrlVariable,

View File

@@ -5,7 +5,7 @@ import dashboardVariablesQuery from 'api/dashboard/variables/dashboardVariablesQ
import { REACT_QUERY_KEY } from 'constants/reactQueryKeys';
import { useVariableFetchState } from 'hooks/dashboard/useVariableFetchState';
import sortValues from 'lib/dashboardVariables/sortVariableValues';
import { isArray, isEmpty } from 'lodash-es';
import { isArray, isEmpty, isString } from 'lodash-es';
import { AppState } from 'store/reducers';
import { VariableResponseProps } from 'types/api/dashboard/variables/query';
import { GlobalReducer } from 'types/reducer/globalTime';
@@ -54,7 +54,7 @@ function QueryVariableInput({
onChange,
onDropdownVisibleChange,
handleClear,
getDefaultValue,
applyDefaultIfNeeded,
} = useDashboardVariableSelectHelper({
variableData,
optionsData,
@@ -68,93 +68,81 @@ function QueryVariableInput({
try {
setErrorMessage(null);
// This is just a check given the previously undefined typed name prop. Not significant
// This will be changed when we change the schema
// TODO: @AshwinBhatkal Perses
if (!variableData.name) {
return;
}
// if the response is not an array, premature return
if (
!variablesRes?.variableValues ||
!Array.isArray(variablesRes?.variableValues)
variablesRes?.variableValues &&
Array.isArray(variablesRes?.variableValues)
) {
return;
}
const sortedNewOptions = sortValues(
variablesRes.variableValues,
variableData.sort,
);
const sortedOldOptions = sortValues(optionsData, variableData.sort);
// if options are the same as before, no need to update state or check for selected value validity
// ! selectedValue needs to be set in the first pass though, as options are initially empty array and we need to apply default if needed
// Expecatation is that when oldOptions are not empty, then there is always some selectedValue
if (areArraysEqual(sortedNewOptions, sortedOldOptions)) {
return;
}
setOptionsData(sortedNewOptions);
let isSelectedValueMissingInNewOptions = false;
// Check if currently selected value(s) are present in the new options list
if (isArray(variableData.selectedValue)) {
isSelectedValueMissingInNewOptions = variableData.selectedValue.some(
(val) => !sortedNewOptions.includes(val),
const newOptionsData = sortValues(
variablesRes?.variableValues,
variableData.sort,
);
} else if (
variableData.selectedValue &&
!sortedNewOptions.includes(variableData.selectedValue)
) {
isSelectedValueMissingInNewOptions = true;
}
// If multi-select with ALL option enabled, and ALL is currently selected, we want to maintain that state and select all new options
// This block does not depend on selected value because of ALL and also because we would only come here if options are different from the previous
if (
variableData.multiSelect &&
variableData.showALLOption &&
variableData.allSelected &&
isSelectedValueMissingInNewOptions
) {
onValueUpdate(variableData.name, variableData.id, sortedNewOptions, true);
const oldOptionsData = sortValues(optionsData, variableData.sort) as never;
// Update tempSelection to maintain ALL state when dropdown is open
if (tempSelection !== undefined) {
setTempSelection(sortedNewOptions.map((option) => option.toString()));
}
return;
}
if (!areArraysEqual(newOptionsData, oldOptionsData)) {
let valueNotInList = false;
const value = variableData.selectedValue;
let allSelected = false;
if (isArray(variableData.selectedValue)) {
variableData.selectedValue.forEach((val) => {
if (!newOptionsData.includes(val)) {
valueNotInList = true;
}
});
} else if (
isString(variableData.selectedValue) &&
!newOptionsData.includes(variableData.selectedValue)
) {
valueNotInList = true;
}
if (variableData.multiSelect) {
const { selectedValue } = variableData;
allSelected =
sortedNewOptions.length > 0 &&
Array.isArray(selectedValue) &&
sortedNewOptions.every((option) => selectedValue.includes(option));
}
if (variableData.name && (valueNotInList || variableData.allSelected)) {
if (
variableData.allSelected &&
variableData.multiSelect &&
variableData.showALLOption
) {
if (
variableData.name &&
variableData.id &&
!isEmpty(variableData.selectedValue)
) {
onValueUpdate(
variableData.name,
variableData.id,
newOptionsData,
true,
);
}
if (
variableData.name &&
variableData.id &&
!isEmpty(variableData.selectedValue)
) {
onValueUpdate(variableData.name, variableData.id, value, allSelected);
} else {
const defaultValue = getDefaultValue(sortedNewOptions);
if (defaultValue !== undefined) {
onValueUpdate(
variableData.name,
variableData.id,
defaultValue,
allSelected,
);
// Update tempSelection to maintain ALL state when dropdown is open
if (tempSelection !== undefined) {
setTempSelection(newOptionsData.map((option) => option.toString()));
}
} else {
const value = variableData.selectedValue;
let allSelected = false;
if (variableData.multiSelect) {
const { selectedValue } = variableData;
allSelected =
newOptionsData.length > 0 &&
Array.isArray(selectedValue) &&
newOptionsData.every((option) => selectedValue.includes(option));
}
if (
variableData.name &&
variableData.id &&
!isEmpty(variableData.selectedValue)
) {
onValueUpdate(variableData.name, variableData.id, value, allSelected);
}
}
}
setOptionsData(newOptionsData);
// Apply default if no value is selected (e.g., new variable, first load)
applyDefaultIfNeeded(newOptionsData);
}
}
} catch (e) {
@@ -167,7 +155,7 @@ function QueryVariableInput({
onValueUpdate,
tempSelection,
setTempSelection,
getDefaultValue,
applyDefaultIfNeeded,
],
);

View File

@@ -1,6 +1,5 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { act, render } from '@testing-library/react';
import * as dashboardVariablesStoreModule from 'providers/Dashboard/store/dashboardVariables/dashboardVariablesStore';
import {
dashboardVariablesStore,
setDashboardVariablesStore,
@@ -11,7 +10,6 @@ import {
IDashboardVariablesStoreState,
} from 'providers/Dashboard/store/dashboardVariables/dashboardVariablesStoreTypes';
import {
enqueueDescendantsOfVariable,
enqueueFetchOfAllVariables,
initializeVariableFetchStore,
} from 'providers/Dashboard/store/variableFetchStore';
@@ -19,17 +17,6 @@ import { IDashboardVariable } from 'types/api/dashboard/getAll';
import DashboardVariableSelection from '../DashboardVariableSelection';
// Mutable container to capture the onValueUpdate callback from VariableItem
const mockVariableItemCallbacks: {
onValueUpdate?: (
name: string,
id: string,
value: IDashboardVariable['selectedValue'],
allSelected: boolean,
haveCustomValuesSelected?: boolean,
) => void;
} = {};
// Mock providers/Dashboard/Dashboard
const mockSetSelectedDashboard = jest.fn();
const mockUpdateLocalStorageDashboardVariables = jest.fn();
@@ -69,14 +56,10 @@ jest.mock('react-redux', () => ({
useSelector: jest.fn().mockReturnValue({ minTime: 1000, maxTime: 2000 }),
}));
// VariableItem mock captures the onValueUpdate prop for use in onValueUpdate tests
// Mock VariableItem to avoid rendering complexity
jest.mock('../VariableItem', () => ({
__esModule: true,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
default: (props: any): JSX.Element => {
mockVariableItemCallbacks.onValueUpdate = props.onValueUpdate;
return <div data-testid="variable-item" />;
},
default: (): JSX.Element => <div data-testid="variable-item" />,
}));
function createVariable(
@@ -217,162 +200,4 @@ describe('DashboardVariableSelection', () => {
expect(initializeVariableFetchStore).not.toHaveBeenCalled();
expect(enqueueFetchOfAllVariables).not.toHaveBeenCalled();
});
describe('onValueUpdate', () => {
let updateStoreSpy: jest.SpyInstance;
beforeEach(() => {
resetStore();
jest.clearAllMocks();
// Real implementation pass-through — we just want to observe calls
updateStoreSpy = jest.spyOn(
dashboardVariablesStoreModule,
'updateDashboardVariablesStore',
);
});
afterEach(() => {
updateStoreSpy.mockRestore();
});
it('updates dashboardVariablesStore synchronously before enqueueDescendantsOfVariable', () => {
setDashboardVariablesStore({
dashboardId: 'dash-1',
variables: {
env: createVariable({ name: 'env', id: 'env-id', order: 0 }),
},
});
render(<DashboardVariableSelection />);
const callOrder: string[] = [];
updateStoreSpy.mockImplementation(() => {
callOrder.push('updateDashboardVariablesStore');
});
(enqueueDescendantsOfVariable as jest.Mock).mockImplementation(() => {
callOrder.push('enqueueDescendantsOfVariable');
});
act(() => {
mockVariableItemCallbacks.onValueUpdate?.(
'env',
'env-id',
'production',
false,
);
});
expect(callOrder).toEqual([
'updateDashboardVariablesStore',
'enqueueDescendantsOfVariable',
]);
});
it('passes updated variable value to dashboardVariablesStore', () => {
setDashboardVariablesStore({
dashboardId: 'dash-1',
variables: {
env: createVariable({
name: 'env',
id: 'env-id',
order: 0,
selectedValue: 'staging',
}),
},
});
render(<DashboardVariableSelection />);
// Clear spy calls that happened during setup/render
updateStoreSpy.mockClear();
act(() => {
mockVariableItemCallbacks.onValueUpdate?.(
'env',
'env-id',
'production',
false,
);
});
expect(updateStoreSpy).toHaveBeenCalledWith(
expect.objectContaining({
dashboardId: 'dash-1',
variables: expect.objectContaining({
env: expect.objectContaining({
selectedValue: 'production',
allSelected: false,
}),
}),
}),
);
});
it('calls enqueueDescendantsOfVariable synchronously without a timer', () => {
jest.useFakeTimers();
setDashboardVariablesStore({
dashboardId: 'dash-1',
variables: {
env: createVariable({ name: 'env', id: 'env-id', order: 0 }),
},
});
render(<DashboardVariableSelection />);
act(() => {
mockVariableItemCallbacks.onValueUpdate?.(
'env',
'env-id',
'production',
false,
);
});
// Must be called immediately — no timer advancement needed
expect(enqueueDescendantsOfVariable).toHaveBeenCalledWith('env');
jest.useRealTimers();
});
it('propagates allSelected and haveCustomValuesSelected to the store', () => {
setDashboardVariablesStore({
dashboardId: 'dash-1',
variables: {
env: createVariable({
name: 'env',
id: 'env-id',
order: 0,
multiSelect: true,
showALLOption: true,
}),
},
});
render(<DashboardVariableSelection />);
updateStoreSpy.mockClear();
act(() => {
mockVariableItemCallbacks.onValueUpdate?.(
'env',
'env-id',
['production', 'staging'],
true,
false,
);
});
expect(updateStoreSpy).toHaveBeenCalledWith(
expect.objectContaining({
variables: expect.objectContaining({
env: expect.objectContaining({
selectedValue: ['production', 'staging'],
allSelected: true,
haveCustomValuesSelected: false,
}),
}),
}),
);
});
});
});

View File

@@ -1,275 +0,0 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { QueryClient, QueryClientProvider } from 'react-query';
import { act, render, waitFor } from '@testing-library/react';
import dashboardVariablesQuery from 'api/dashboard/variables/dashboardVariablesQuery';
import { variableFetchStore } from 'providers/Dashboard/store/variableFetchStore';
import { IDashboardVariable } from 'types/api/dashboard/getAll';
import QueryVariableInput from '../QueryVariableInput';
jest.mock('api/dashboard/variables/dashboardVariablesQuery');
jest.mock('react-redux', () => ({
...jest.requireActual('react-redux'),
useSelector: jest.fn().mockReturnValue({ minTime: 1000, maxTime: 2000 }),
}));
function createTestQueryClient(): QueryClient {
return new QueryClient({
defaultOptions: {
queries: { retry: false, refetchOnWindowFocus: false },
},
});
}
function Wrapper({
children,
queryClient,
}: {
children: React.ReactNode;
queryClient: QueryClient;
}): JSX.Element {
return (
<QueryClientProvider client={queryClient}>{children}</QueryClientProvider>
);
}
function createVariable(
overrides: Partial<IDashboardVariable> = {},
): IDashboardVariable {
return {
id: 'env-id',
name: 'env',
description: '',
type: 'QUERY',
sort: 'DISABLED',
showALLOption: false,
multiSelect: false,
order: 0,
queryValue: 'SELECT env FROM table',
...overrides,
};
}
/** Put the named variable into 'loading' state so useQuery fires on mount */
function setVariableLoading(name: string): void {
variableFetchStore.update((draft) => {
draft.states[name] = 'loading';
draft.cycleIds[name] = (draft.cycleIds[name] || 0) + 1;
});
}
function resetFetchStore(): void {
variableFetchStore.set(() => ({
states: {},
lastUpdated: {},
cycleIds: {},
}));
}
describe('QueryVariableInput - getOptions logic', () => {
const mockOnValueUpdate = jest.fn();
beforeEach(() => {
jest.clearAllMocks();
resetFetchStore();
});
afterEach(() => {
resetFetchStore();
});
it('applies default value (first option) when selectedValue is empty on first load', async () => {
(dashboardVariablesQuery as jest.Mock).mockResolvedValue({
statusCode: 200,
payload: { variableValues: ['production', 'staging', 'dev'] },
});
const variable = createVariable({ selectedValue: undefined });
setVariableLoading('env');
const queryClient = createTestQueryClient();
render(
<Wrapper queryClient={queryClient}>
<QueryVariableInput
variableData={variable}
existingVariables={{ 'env-id': variable }}
onValueUpdate={mockOnValueUpdate}
/>
</Wrapper>,
);
await waitFor(() => {
expect(mockOnValueUpdate).toHaveBeenCalledWith(
'env',
'env-id',
'production', // first option by default
false,
);
});
});
it('keeps existing selectedValue when it is present in new options', async () => {
(dashboardVariablesQuery as jest.Mock).mockResolvedValue({
statusCode: 200,
payload: { variableValues: ['production', 'staging'] },
});
const variable = createVariable({ selectedValue: 'staging' });
setVariableLoading('env');
const queryClient = createTestQueryClient();
render(
<Wrapper queryClient={queryClient}>
<QueryVariableInput
variableData={variable}
existingVariables={{ 'env-id': variable }}
onValueUpdate={mockOnValueUpdate}
/>
</Wrapper>,
);
await waitFor(() => {
expect(mockOnValueUpdate).toHaveBeenCalledWith(
'env',
'env-id',
'staging',
false,
);
});
});
it('selects all new options when allSelected=true and value is missing from new options', async () => {
(dashboardVariablesQuery as jest.Mock).mockResolvedValue({
statusCode: 200,
payload: { variableValues: ['production', 'staging'] },
});
const variable = createVariable({
selectedValue: ['old-env'],
allSelected: true,
multiSelect: true,
showALLOption: true,
});
setVariableLoading('env');
const queryClient = createTestQueryClient();
render(
<Wrapper queryClient={queryClient}>
<QueryVariableInput
variableData={variable}
existingVariables={{ 'env-id': variable }}
onValueUpdate={mockOnValueUpdate}
/>
</Wrapper>,
);
await waitFor(() => {
expect(mockOnValueUpdate).toHaveBeenCalledWith(
'env',
'env-id',
['production', 'staging'],
true,
);
});
});
it('does not call onValueUpdate a second time when options have not changed', async () => {
const mockQueryFn = jest.fn().mockResolvedValue({
statusCode: 200,
payload: { variableValues: ['production', 'staging'] },
});
(dashboardVariablesQuery as jest.Mock).mockImplementation(mockQueryFn);
const variable = createVariable({ selectedValue: 'production' });
setVariableLoading('env');
const queryClient = createTestQueryClient();
render(
<Wrapper queryClient={queryClient}>
<QueryVariableInput
variableData={variable}
existingVariables={{ 'env-id': variable }}
onValueUpdate={mockOnValueUpdate}
/>
</Wrapper>,
);
// Wait for first fetch and onValueUpdate call
await waitFor(() => {
expect(mockOnValueUpdate).toHaveBeenCalledTimes(1);
});
mockOnValueUpdate.mockClear();
// Trigger a second fetch cycle with the same API response
act(() => {
variableFetchStore.update((draft) => {
draft.states['env'] = 'revalidating';
draft.cycleIds['env'] = (draft.cycleIds['env'] || 0) + 1;
});
});
// Wait for second query to fire
await waitFor(() => {
expect(mockQueryFn).toHaveBeenCalledTimes(2);
});
// Options are unchanged, so onValueUpdate must not fire again
expect(mockOnValueUpdate).not.toHaveBeenCalled();
});
it('does not call onValueUpdate when API returns a non-array response', async () => {
(dashboardVariablesQuery as jest.Mock).mockResolvedValue({
statusCode: 200,
payload: { variableValues: null },
});
const variable = createVariable({ selectedValue: 'production' });
setVariableLoading('env');
const queryClient = createTestQueryClient();
render(
<Wrapper queryClient={queryClient}>
<QueryVariableInput
variableData={variable}
existingVariables={{ 'env-id': variable }}
onValueUpdate={mockOnValueUpdate}
/>
</Wrapper>,
);
await waitFor(() => {
expect(dashboardVariablesQuery).toHaveBeenCalled();
});
expect(mockOnValueUpdate).not.toHaveBeenCalled();
});
it('does not fire the query when variableData.name is empty', () => {
(dashboardVariablesQuery as jest.Mock).mockResolvedValue({
statusCode: 200,
payload: { variableValues: ['production'] },
});
// Variable with no name — useVariableFetchState will be called with ''
// and the query key will have an empty name, leaving it disabled
const variable = createVariable({ name: '' });
// Note: we do NOT put it in 'loading' state since name is empty
// (no variableFetchStore entry for '' means isVariableFetching=false)
const queryClient = createTestQueryClient();
render(
<Wrapper queryClient={queryClient}>
<QueryVariableInput
variableData={variable}
existingVariables={{ 'env-id': variable }}
onValueUpdate={mockOnValueUpdate}
/>
</Wrapper>,
);
expect(dashboardVariablesQuery).not.toHaveBeenCalled();
expect(mockOnValueUpdate).not.toHaveBeenCalled();
});
});

View File

@@ -46,9 +46,6 @@ interface UseDashboardVariableSelectHelperReturn {
applyDefaultIfNeeded: (
overrideOptions?: (string | number | boolean)[],
) => void;
getDefaultValue: (
overrideOptions?: (string | number | boolean)[],
) => string | string[] | undefined;
}
// eslint-disable-next-line sonarjs/cognitive-complexity
@@ -251,6 +248,5 @@ export function useDashboardVariableSelectHelper({
defaultValue,
onChange,
applyDefaultIfNeeded,
getDefaultValue,
};
}

View File

@@ -121,23 +121,9 @@ function BodyTitleRenderer({
return (
<TitleWrapper onClick={handleNodeClick}>
{typeof value !== 'object' && (
<span
onClick={(e): void => {
e.stopPropagation();
e.preventDefault();
}}
onMouseDown={(e): void => e.preventDefault()}
>
<Dropdown
menu={menu}
trigger={['click']}
dropdownRender={(originNode): React.ReactNode => (
<div data-log-detail-ignore="true">{originNode}</div>
)}
>
<SettingOutlined style={{ marginRight: 8 }} className="hover-reveal" />
</Dropdown>
</span>
<Dropdown menu={menu} trigger={['click']}>
<SettingOutlined style={{ marginRight: 8 }} className="hover-reveal" />
</Dropdown>
)}
{title.toString()}{' '}
{!parentIsArray && typeof value !== 'object' && (

View File

@@ -13,7 +13,7 @@ function FieldRenderer({ field }: FieldRendererProps): JSX.Element {
<span className="field-renderer-container">
{dataType && newField && logType ? (
<>
<Tooltip placement="left" title={newField} mouseLeaveDelay={0}>
<Tooltip placement="left" title={newField}>
<Typography.Text ellipsis className="label">
{newField}{' '}
</Typography.Text>

View File

@@ -46,7 +46,7 @@ function Overview({
handleChangeSelectedView,
}: Props): JSX.Element {
const [isWrapWord, setIsWrapWord] = useState<boolean>(true);
const [isSearchVisible, setIsSearchVisible] = useState<boolean>(true);
const [isSearchVisible, setIsSearchVisible] = useState<boolean>(false);
const [isAttributesExpanded, setIsAttributesExpanded] = useState<boolean>(
true,
);

View File

@@ -245,7 +245,7 @@ function TableView({
<Typography.Text>{renderedField}</Typography.Text>
{traceId && (
<Tooltip title="Inspect in Trace" mouseLeaveDelay={0}>
<Tooltip title="Inspect in Trace">
<Button
className="periscope-btn"
onClick={(

View File

@@ -1,34 +0,0 @@
import { Color } from '@signozhq/design-tokens';
import { getColorsForSeverityLabels, isRedLike } from '../utils';
describe('getColorsForSeverityLabels', () => {
it('should return slate for blank labels', () => {
expect(getColorsForSeverityLabels('', 0)).toBe(Color.BG_SLATE_300);
expect(getColorsForSeverityLabels(' ', 0)).toBe(Color.BG_SLATE_300);
});
it('should return correct colors for known severity variants', () => {
expect(getColorsForSeverityLabels('INFO', 0)).toBe(Color.BG_ROBIN_600);
expect(getColorsForSeverityLabels('ERROR', 0)).toBe(Color.BG_CHERRY_600);
expect(getColorsForSeverityLabels('WARN', 0)).toBe(Color.BG_AMBER_600);
expect(getColorsForSeverityLabels('DEBUG', 0)).toBe(Color.BG_AQUA_600);
expect(getColorsForSeverityLabels('TRACE', 0)).toBe(Color.BG_FOREST_600);
expect(getColorsForSeverityLabels('FATAL', 0)).toBe(Color.BG_SAKURA_600);
});
it('should return non-red colors for unrecognized labels at any index', () => {
for (let i = 0; i < 30; i++) {
const color = getColorsForSeverityLabels('4', i);
expect(isRedLike(color)).toBe(false);
}
});
it('should return non-red colors for numeric severity text', () => {
const numericLabels = ['1', '2', '4', '9', '13', '17', '21'];
numericLabels.forEach((label) => {
const color = getColorsForSeverityLabels(label, 0);
expect(isRedLike(color)).toBe(false);
});
});
});

View File

@@ -1,16 +1,7 @@
import { Color } from '@signozhq/design-tokens';
import { themeColors } from 'constants/theme';
import { colors } from 'lib/getRandomColor';
// Function to determine if a color is "red-like" based on its RGB values
export function isRedLike(hex: string): boolean {
const r = parseInt(hex.slice(1, 3), 16);
const g = parseInt(hex.slice(3, 5), 16);
const b = parseInt(hex.slice(5, 7), 16);
return r > 180 && r > g * 1.4 && r > b * 1.4;
}
const SAFE_FALLBACK_COLORS = colors.filter((c) => !isRedLike(c));
const SEVERITY_VARIANT_COLORS: Record<string, string> = {
TRACE: Color.BG_FOREST_600,
Trace: Color.BG_FOREST_500,
@@ -76,13 +67,8 @@ export function getColorsForSeverityLabels(
label: string,
index: number,
): string {
const trimmed = label.trim();
if (!trimmed) {
return Color.BG_SLATE_300;
}
const variantColor = SEVERITY_VARIANT_COLORS[trimmed];
// Check if we have a direct mapping for this severity variant
const variantColor = SEVERITY_VARIANT_COLORS[label.trim()];
if (variantColor) {
return variantColor;
}
@@ -117,8 +103,5 @@ export function getColorsForSeverityLabels(
return Color.BG_SAKURA_500;
}
return (
SAFE_FALLBACK_COLORS[index % SAFE_FALLBACK_COLORS.length] ||
Color.BG_SLATE_400
);
return colors[index % colors.length] || themeColors.red;
}

View File

@@ -111,19 +111,23 @@ const InfinityTable = forwardRef<TableVirtuosoHandle, InfinityTableProps>(
);
const itemContent = useCallback(
(index: number, log: Record<string, unknown>): JSX.Element => (
<TableRow
tableColumns={tableColumns}
index={index}
log={log}
logs={tableViewProps.logs}
hasActions
fontSize={tableViewProps.fontSize}
onShowLogDetails={onSetActiveLog}
isActiveLog={activeLog?.id === log.id}
onClearActiveLog={onCloseActiveLog}
/>
),
(index: number, log: Record<string, unknown>): JSX.Element => {
return (
<div key={log.id as string}>
<TableRow
tableColumns={tableColumns}
index={index}
log={log}
logs={tableViewProps.logs}
hasActions
fontSize={tableViewProps.fontSize}
onShowLogDetails={onSetActiveLog}
isActiveLog={activeLog?.id === log.id}
onClearActiveLog={onCloseActiveLog}
/>
</div>
);
},
[
tableColumns,
onSetActiveLog,

View File

@@ -1,7 +1,7 @@
import { useTranslation } from 'react-i18next';
import { Form } from 'antd';
import { initialQueryBuilderFormValuesMap } from 'constants/queryBuilder';
import QueryBuilderSearchV2 from 'container/QueryBuilder/filters/QueryBuilderSearchV2/QueryBuilderSearchV2';
import QueryBuilderSearch from 'container/QueryBuilder/filters/QueryBuilderSearch';
import isEqual from 'lodash-es/isEqual';
import { TagFilter } from 'types/api/queryBuilder/queryBuilderData';
@@ -30,7 +30,7 @@ function TagFilterInput({
};
return (
<QueryBuilderSearchV2
<QueryBuilderSearch
query={query}
onChange={onQueryChange}
placeholder={placeholder}

View File

@@ -86,7 +86,7 @@ jest.mock('providers/preferences/sync/usePreferenceSync', () => ({
}));
const BASE_URL = ENVIRONMENT.baseURL;
const attributeKeysURL = `${BASE_URL}/api/v3/filter_suggestions`;
const attributeKeysURL = `${BASE_URL}/api/v3/autocomplete/attribute_keys`;
describe('PipelinePage container test', () => {
beforeAll(() => {
@@ -333,34 +333,26 @@ describe('PipelinePage container test', () => {
ctx.json({
status: 'success',
data: {
attributes: [
attributeKeys: [
{
key: 'otelServiceName',
dataType: DataTypes.String,
type: 'tag',
isColumn: false,
isJSON: false,
},
{
key: 'service.name',
dataType: DataTypes.String,
type: 'resource',
isColumn: false,
isJSON: false,
},
{
key: 'service.instance.id',
dataType: DataTypes.String,
type: 'resource',
isColumn: false,
isJSON: false,
},
{
key: 'service.name',
dataType: DataTypes.String,
type: 'resource',
},
{
key: 'service.name',
dataType: DataTypes.String,
type: 'tag',
isColumn: false,
isJSON: false,
},
],
},

View File

@@ -973,7 +973,6 @@ function QueryBuilderSearchV2(
return (
<div className="query-builder-search-v2">
<Select
data-testid={'qb-search-select'}
ref={selectRef}
// eslint-disable-next-line react/jsx-props-no-spreading
{...(hasPopupContainer ? { getPopupContainer: popupContainer } : {})}

View File

@@ -1,94 +0,0 @@
import { act, renderHook } from '@testing-library/react';
import { useActiveLog } from 'hooks/logs/useActiveLog';
import { useIsTextSelected } from 'hooks/useIsTextSelected';
import { ILog } from 'types/api/logs/log';
import useLogDetailHandlers from '../useLogDetailHandlers';
jest.mock('hooks/logs/useActiveLog');
jest.mock('hooks/useIsTextSelected');
const mockOnSetActiveLog = jest.fn();
const mockOnClearActiveLog = jest.fn();
const mockOnAddToQuery = jest.fn();
const mockOnGroupByAttribute = jest.fn();
const mockIsTextSelected = jest.fn();
const mockLog: ILog = {
id: 'log-1',
timestamp: '2024-01-01T00:00:00Z',
date: '2024-01-01',
body: 'test log body',
severityText: 'INFO',
severityNumber: 9,
traceFlags: 0,
traceId: '',
spanID: '',
attributesString: {},
attributesInt: {},
attributesFloat: {},
resources_string: {},
scope_string: {},
attributes_string: {},
severity_text: '',
severity_number: 0,
};
beforeEach(() => {
jest.clearAllMocks();
jest.mocked(useIsTextSelected).mockReturnValue(mockIsTextSelected);
jest.mocked(useActiveLog).mockReturnValue({
activeLog: null,
onSetActiveLog: mockOnSetActiveLog,
onClearActiveLog: mockOnClearActiveLog,
onAddToQuery: mockOnAddToQuery,
onGroupByAttribute: mockOnGroupByAttribute,
});
});
it('should not open log detail when text is selected', () => {
mockIsTextSelected.mockReturnValue(true);
const { result } = renderHook(() => useLogDetailHandlers());
act(() => {
result.current.handleSetActiveLog(mockLog);
});
expect(mockOnSetActiveLog).not.toHaveBeenCalled();
});
it('should open log detail when no text is selected', () => {
mockIsTextSelected.mockReturnValue(false);
const { result } = renderHook(() => useLogDetailHandlers());
act(() => {
result.current.handleSetActiveLog(mockLog);
});
expect(mockOnSetActiveLog).toHaveBeenCalledWith(mockLog);
});
it('should toggle off when clicking the same active log', () => {
mockIsTextSelected.mockReturnValue(false);
jest.mocked(useActiveLog).mockReturnValue({
activeLog: mockLog,
onSetActiveLog: mockOnSetActiveLog,
onClearActiveLog: mockOnClearActiveLog,
onAddToQuery: mockOnAddToQuery,
onGroupByAttribute: mockOnGroupByAttribute,
});
const { result } = renderHook(() => useLogDetailHandlers());
act(() => {
result.current.handleSetActiveLog(mockLog);
});
expect(mockOnClearActiveLog).toHaveBeenCalled();
expect(mockOnSetActiveLog).not.toHaveBeenCalled();
});

View File

@@ -1,17 +1,15 @@
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import { useCallback, useMemo, useState } from 'react';
import { useQueryClient } from 'react-query';
import { useDispatch, useSelector } from 'react-redux';
import { useHistory, useLocation } from 'react-router-dom';
import { getAggregateKeys } from 'api/queryBuilder/getAttributeKeys';
import { SOMETHING_WENT_WRONG } from 'constants/api';
import { QueryParams } from 'constants/query';
import { OPERATORS, QueryBuilderKeys } from 'constants/queryBuilder';
import ROUTES from 'constants/routes';
import { MetricsType } from 'container/MetricsApplication/constant';
import { getOperatorValue } from 'container/QueryBuilder/filters/QueryBuilderSearch/utils';
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
import { useNotifications } from 'hooks/useNotifications';
import useUrlQuery from 'hooks/useUrlQuery';
import { getGeneratedFilterQueryString } from 'lib/getGeneratedFilterQueryString';
import { chooseAutocompleteFromCustomValue } from 'lib/newQueryBuilder/chooseAutocompleteFromCustomValue';
import { AppState } from 'store/reducers';
@@ -56,20 +54,6 @@ export const useActiveLog = (): UseActiveLog => {
const [activeLog, setActiveLog] = useState<ILog | null>(null);
// Close drawer/clear active log when query in URL changes
const urlQuery = useUrlQuery();
const compositeQuery = urlQuery.get(QueryParams.compositeQuery) ?? '';
const prevQueryRef = useRef<string | null>(null);
useEffect(() => {
if (
prevQueryRef.current !== null &&
prevQueryRef.current !== compositeQuery
) {
setActiveLog(null);
}
prevQueryRef.current = compositeQuery;
}, [compositeQuery]);
const onSetDetailedLogData = useCallback(
(logData: ILog) => {
dispatch({

View File

@@ -2,7 +2,6 @@ import { useCallback, useState } from 'react';
import { VIEW_TYPES } from 'components/LogDetail/constants';
import type { UseActiveLog } from 'hooks/logs/types';
import { useActiveLog } from 'hooks/logs/useActiveLog';
import { useIsTextSelected } from 'hooks/useIsTextSelected';
import { ILog } from 'types/api/logs/log';
type SelectedTab = typeof VIEW_TYPES[keyof typeof VIEW_TYPES] | undefined;
@@ -29,13 +28,9 @@ function useLogDetailHandlers({
onAddToQuery,
} = useActiveLog();
const [selectedTab, setSelectedTab] = useState<SelectedTab>(defaultTab);
const isTextSelected = useIsTextSelected();
const handleSetActiveLog = useCallback(
(log: ILog, nextTab: SelectedTab = defaultTab): void => {
if (isTextSelected()) {
return;
}
if (activeLog?.id === log.id) {
onClearActiveLog();
setSelectedTab(undefined);
@@ -44,7 +39,7 @@ function useLogDetailHandlers({
onSetActiveLog(log);
setSelectedTab(nextTab ?? defaultTab);
},
[activeLog?.id, defaultTab, onClearActiveLog, onSetActiveLog, isTextSelected],
[activeLog?.id, defaultTab, onClearActiveLog, onSetActiveLog],
);
const handleCloseLogDetail = useCallback((): void => {

View File

@@ -1,10 +0,0 @@
import { useCallback } from 'react';
export function useIsTextSelected(): () => boolean {
return useCallback((): boolean => {
const selection = window.getSelection();
return (
!!selection && !selection.isCollapsed && selection.toString().length > 0
);
}, []);
}

View File

@@ -3,7 +3,7 @@ package formatter
import (
"fmt"
"github.com/SigNoz/signoz/pkg/query-service/converter"
"github.com/SigNoz/signoz/pkg/converter"
"github.com/dustin/go-humanize"
)

View File

@@ -3,7 +3,7 @@ package formatter
import (
"fmt"
"github.com/SigNoz/signoz/pkg/query-service/converter"
"github.com/SigNoz/signoz/pkg/converter"
"github.com/dustin/go-humanize"
)

View File

@@ -78,9 +78,8 @@ func toFixed(value float64, decimals DecimalCount) string {
}
decimalPos := strings.Index(formatted, ".")
precision := 0
if decimalPos != -1 {
precision = len(formatted) - decimalPos - 1
precision := len(formatted) - decimalPos - 1
if precision < *decimals {
return formatted + strings.Repeat("0", *decimals-precision)
}
@@ -89,8 +88,8 @@ func toFixed(value float64, decimals DecimalCount) string {
return formatted
}
func toFixedScaled(value float64, decimals DecimalCount, scaleFormat string) string {
return toFixed(value, decimals) + scaleFormat
func toFixedScaled(value float64, scaleFormat string) string {
return toFixed(value, nil) + scaleFormat
}
func getDecimalsForValue(value float64) int {

View File

@@ -13,35 +13,35 @@ func (*throughputFormatter) Name() string {
return "throughput"
}
func simpleCountUnit(value float64, decimals *int, symbol string) string {
func simpleCountUnit(value float64, symbol string) string {
units := []string{"", "K", "M", "B", "T"}
scaler := scaledUnits(1000, units, 0)
return scaler(value, decimals) + " " + symbol
return scaler(value, nil) + " " + symbol
}
func (f *throughputFormatter) Format(value float64, unit string) string {
switch unit {
case "cps", "{count}/s":
return simpleCountUnit(value, nil, "c/s")
return simpleCountUnit(value, "c/s")
case "ops", "{ops}/s":
return simpleCountUnit(value, nil, "op/s")
return simpleCountUnit(value, "op/s")
case "reqps", "{req}/s":
return simpleCountUnit(value, nil, "req/s")
return simpleCountUnit(value, "req/s")
case "rps", "{read}/s":
return simpleCountUnit(value, nil, "r/s")
return simpleCountUnit(value, "r/s")
case "wps", "{write}/s":
return simpleCountUnit(value, nil, "w/s")
return simpleCountUnit(value, "w/s")
case "iops", "{iops}/s":
return simpleCountUnit(value, nil, "iops")
return simpleCountUnit(value, "iops")
case "cpm", "{count}/min":
return simpleCountUnit(value, nil, "c/m")
return simpleCountUnit(value, "c/m")
case "opm", "{ops}/min":
return simpleCountUnit(value, nil, "op/m")
return simpleCountUnit(value, "op/m")
case "rpm", "{read}/min":
return simpleCountUnit(value, nil, "r/m")
return simpleCountUnit(value, "r/m")
case "wpm", "{write}/min":
return simpleCountUnit(value, nil, "w/m")
return simpleCountUnit(value, "w/m")
}
// When unit is not matched, return the value as it is.
return fmt.Sprintf("%v", value)

View File

@@ -46,17 +46,17 @@ func toNanoSeconds(value float64) string {
if absValue < 1000 {
return toFixed(value, nil) + " ns"
} else if absValue < 1000000 { // 2000 ns is better represented as 2 µs
return toFixedScaled(value/1000, nil, " µs")
return toFixedScaled(value/1000, " µs")
} else if absValue < 1000000000 { // 2000000 ns is better represented as 2 ms
return toFixedScaled(value/1000000, nil, " ms")
return toFixedScaled(value/1000000, " ms")
} else if absValue < 60000000000 {
return toFixedScaled(value/1000000000, nil, " s")
return toFixedScaled(value/1000000000, " s")
} else if absValue < 3600000000000 {
return toFixedScaled(value/60000000000, nil, " min")
return toFixedScaled(value/60000000000, " min")
} else if absValue < 86400000000000 {
return toFixedScaled(value/3600000000000, nil, " hour")
return toFixedScaled(value/3600000000000, " hour")
} else {
return toFixedScaled(value/86400000000000, nil, " day")
return toFixedScaled(value/86400000000000, " day")
}
}
@@ -66,9 +66,9 @@ func toMicroSeconds(value float64) string {
if absValue < 1000 {
return toFixed(value, nil) + " µs"
} else if absValue < 1000000 { // 2000 µs is better represented as 2 ms
return toFixedScaled(value/1000, nil, " ms")
return toFixedScaled(value/1000, " ms")
} else {
return toFixedScaled(value/1000000, nil, " s")
return toFixedScaled(value/1000000, " s")
}
}
@@ -80,16 +80,16 @@ func toMilliSeconds(value float64) string {
if absValue < 1000 {
return toFixed(value, nil) + " ms"
} else if absValue < 60000 {
return toFixedScaled(value/1000, nil, " s")
return toFixedScaled(value/1000, " s")
} else if absValue < 3600000 {
return toFixedScaled(value/60000, nil, " min")
return toFixedScaled(value/60000, " min")
} else if absValue < 86400000 { // 172800000 ms is better represented as 2 day
return toFixedScaled(value/3600000, nil, " hour")
return toFixedScaled(value/3600000, " hour")
} else if absValue < 31536000000 {
return toFixedScaled(value/86400000, nil, " day")
return toFixedScaled(value/86400000, " day")
}
return toFixedScaled(value/31536000000, nil, " year")
return toFixedScaled(value/31536000000, " year")
}
// toSeconds returns a easy to read string representation of the given value in seconds
@@ -97,24 +97,24 @@ func toSeconds(value float64) string {
absValue := math.Abs(value)
if absValue < 0.000001 {
return toFixedScaled(value*1e9, nil, " ns")
return toFixedScaled(value*1e9, " ns")
} else if absValue < 0.001 {
return toFixedScaled(value*1e6, nil, " µs")
return toFixedScaled(value*1e6, " µs")
} else if absValue < 1 {
return toFixedScaled(value*1e3, nil, " ms")
return toFixedScaled(value*1e3, " ms")
} else if absValue < 60 {
return toFixed(value, nil) + " s"
} else if absValue < 3600 {
return toFixedScaled(value/60, nil, " min")
return toFixedScaled(value/60, " min")
} else if absValue < 86400 { // 56000 s is better represented as 15.56 hour
return toFixedScaled(value/3600, nil, " hour")
return toFixedScaled(value/3600, " hour")
} else if absValue < 604800 {
return toFixedScaled(value/86400, nil, " day")
return toFixedScaled(value/86400, " day")
} else if absValue < 31536000 {
return toFixedScaled(value/604800, nil, " week")
return toFixedScaled(value/604800, " week")
}
return toFixedScaled(value/3.15569e7, nil, " year")
return toFixedScaled(value/3.15569e7, " year")
}
// toMinutes returns a easy to read string representation of the given value in minutes
@@ -124,13 +124,13 @@ func toMinutes(value float64) string {
if absValue < 60 {
return toFixed(value, nil) + " min"
} else if absValue < 1440 {
return toFixedScaled(value/60, nil, " hour")
return toFixedScaled(value/60, " hour")
} else if absValue < 10080 {
return toFixedScaled(value/1440, nil, " day")
return toFixedScaled(value/1440, " day")
} else if absValue < 604800 {
return toFixedScaled(value/10080, nil, " week")
return toFixedScaled(value/10080, " week")
} else {
return toFixedScaled(value/5.25948e5, nil, " year")
return toFixedScaled(value/5.25948e5, " year")
}
}
@@ -142,11 +142,11 @@ func toHours(value float64) string {
if absValue < 24 {
return toFixed(value, nil) + " hour"
} else if absValue < 168 {
return toFixedScaled(value/24, nil, " day")
return toFixedScaled(value/24, " day")
} else if absValue < 8760 {
return toFixedScaled(value/168, nil, " week")
return toFixedScaled(value/168, " week")
} else {
return toFixedScaled(value/8760, nil, " year")
return toFixedScaled(value/8760, " year")
}
}
@@ -157,9 +157,9 @@ func toDays(value float64) string {
if absValue < 7 {
return toFixed(value, nil) + " day"
} else if absValue < 365 {
return toFixedScaled(value/7, nil, " week")
return toFixedScaled(value/7, " week")
} else {
return toFixedScaled(value/365, nil, " year")
return toFixedScaled(value/365, " year")
}
}
@@ -170,6 +170,6 @@ func toWeeks(value float64) string {
if absValue < 52 {
return toFixed(value, nil) + " week"
} else {
return toFixedScaled(value/52, nil, " year")
return toFixedScaled(value/52, " year")
}
}

View File

@@ -1,44 +0,0 @@
package middleware
import (
"log/slog"
"net/http"
"runtime/debug"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
)
// Recovery is a middleware that recovers from panics, logs the panic,
// and returns a 500 Internal Server Error.
type Recovery struct {
logger *slog.Logger
}
// NewRecovery creates a new Recovery middleware.
func NewRecovery(logger *slog.Logger) Wrapper {
return &Recovery{
logger: logger.With("pkg", "http-middleware-recovery"),
}
}
// Wrap is the middleware handler.
func (m *Recovery) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
m.logger.ErrorContext(
r.Context(),
"panic recovered",
"err", err, "stack", string(debug.Stack()),
)
render.Error(w, errors.NewInternalf(
errors.CodeInternal, "internal server error",
))
}
}()
next.ServeHTTP(w, r)
})
}

View File

@@ -87,7 +87,7 @@ func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetype
}
func (m *module) listPromotedPaths(ctx context.Context) ([]string, error) {
paths, err := m.metadataStore.GetPromotedPaths(ctx)
paths, err := m.metadataStore.ListPromotedPaths(ctx)
if err != nil {
return nil, err
}
@@ -142,7 +142,7 @@ func (m *module) PromoteAndIndexPaths(
pathsStr = append(pathsStr, path.Path)
}
existingPromotedPaths, err := m.metadataStore.GetPromotedPaths(ctx, pathsStr...)
existingPromotedPaths, err := m.metadataStore.ListPromotedPaths(ctx, pathsStr...)
if err != nil {
return err
}

View File

@@ -13,7 +13,6 @@ import (
root "github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
@@ -463,7 +462,7 @@ func (h *handler) UpdateAPIKey(w http.ResponseWriter, r *http.Request) {
return
}
if slices.Contains(integrationtypes.CloudIntegrationUserEmails, createdByUser.Email) {
if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(createdByUser.Email.String())) {
render.Error(w, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "API Keys for integration users cannot be revoked"))
return
}
@@ -508,7 +507,7 @@ func (h *handler) RevokeAPIKey(w http.ResponseWriter, r *http.Request) {
return
}
if slices.Contains(integrationtypes.CloudIntegrationUserEmails, createdByUser.Email) {
if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(createdByUser.Email.String())) {
render.Error(w, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "API Keys for integration users cannot be revoked"))
return
}

View File

@@ -19,7 +19,6 @@ import (
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/emailtypes"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/types/roletypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/dustin/go-humanize"
@@ -174,7 +173,7 @@ func (m *Module) DeleteInvite(ctx context.Context, orgID string, id valuer.UUID)
func (module *Module) CreateUser(ctx context.Context, input *types.User, opts ...root.CreateUserOption) error {
createUserOpts := root.NewCreateUserOptions(opts...)
// since assign is idempotent multiple calls to assign won't cause issues in case of retries.
// since assign is idempotant multiple calls to assign won't cause issues in case of retries.
err := module.authz.Grant(ctx, input.OrgID, roletypes.MustGetSigNozManagedRoleFromExistingRole(input.Role), authtypes.MustNewSubject(authtypes.TypeableUser, input.ID.StringValue(), input.OrgID, nil))
if err != nil {
return err
@@ -280,7 +279,7 @@ func (module *Module) DeleteUser(ctx context.Context, orgID valuer.UUID, id stri
return errors.WithAdditionalf(err, "cannot delete root user")
}
if slices.Contains(integrationtypes.CloudIntegrationUserEmails, user.Email) {
if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(user.Email.String())) {
return errors.New(errors.TypeForbidden, errors.CodeForbidden, "integration user cannot be deleted")
}
@@ -294,7 +293,7 @@ func (module *Module) DeleteUser(ctx context.Context, orgID valuer.UUID, id stri
return errors.New(errors.TypeForbidden, errors.CodeForbidden, "cannot delete the last admin")
}
// since revoke is idempotent multiple calls to revoke won't cause issues in case of retries
// since revoke is idempotant multiple calls to revoke won't cause issues in case of retries
err = module.authz.Revoke(ctx, orgID, roletypes.MustGetSigNozManagedRoleFromExistingRole(user.Role), authtypes.MustNewSubject(authtypes.TypeableUser, id, orgID, nil))
if err != nil {
return err

View File

@@ -1,57 +1,55 @@
package store
package cloudintegrations
import (
"context"
"database/sql"
"fmt"
"log/slog"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
CodeCloudIntegrationAccountNotFound errors.Code = errors.MustNewCode("cloud_integration_account_not_found")
)
type cloudProviderAccountsRepository interface {
listConnected(ctx context.Context, orgId string, provider string) ([]types.CloudIntegration, *model.ApiError)
type CloudProviderAccountsRepository interface {
ListConnected(ctx context.Context, orgId string, provider string) ([]integrationtypes.CloudIntegration, error)
get(ctx context.Context, orgId string, provider string, id string) (*types.CloudIntegration, *model.ApiError)
Get(ctx context.Context, orgId string, provider string, id string) (*integrationtypes.CloudIntegration, error)
GetConnectedCloudAccount(ctx context.Context, orgId, provider string, accountID string) (*integrationtypes.CloudIntegration, error)
getConnectedCloudAccount(ctx context.Context, orgId string, provider string, accountID string) (*types.CloudIntegration, *model.ApiError)
// Insert an account or update it by (cloudProvider, id)
// for specified non-empty fields
Upsert(
upsert(
ctx context.Context,
orgId string,
provider string,
id *string,
config []byte,
config *types.AccountConfig,
accountId *string,
agentReport *integrationtypes.AgentReport,
agentReport *types.AgentReport,
removedAt *time.Time,
) (*integrationtypes.CloudIntegration, error)
) (*types.CloudIntegration, *model.ApiError)
}
func NewCloudProviderAccountsRepository(store sqlstore.SQLStore) CloudProviderAccountsRepository {
return &cloudProviderAccountsSQLRepository{store: store}
func newCloudProviderAccountsRepository(store sqlstore.SQLStore) (
*cloudProviderAccountsSQLRepository, error,
) {
return &cloudProviderAccountsSQLRepository{
store: store,
}, nil
}
type cloudProviderAccountsSQLRepository struct {
store sqlstore.SQLStore
}
func (r *cloudProviderAccountsSQLRepository) ListConnected(
func (r *cloudProviderAccountsSQLRepository) listConnected(
ctx context.Context, orgId string, cloudProvider string,
) ([]integrationtypes.CloudIntegration, error) {
accounts := []integrationtypes.CloudIntegration{}
) ([]types.CloudIntegration, *model.ApiError) {
accounts := []types.CloudIntegration{}
err := r.store.BunDB().NewSelect().
Model(&accounts).
@@ -64,17 +62,18 @@ func (r *cloudProviderAccountsSQLRepository) ListConnected(
Scan(ctx)
if err != nil {
slog.ErrorContext(ctx, "error querying connected cloud accounts", "error", err)
return nil, errors.WrapInternalf(err, errors.CodeInternal, "could not query connected cloud accounts")
return nil, model.InternalError(fmt.Errorf(
"could not query connected cloud accounts: %w", err,
))
}
return accounts, nil
}
func (r *cloudProviderAccountsSQLRepository) Get(
func (r *cloudProviderAccountsSQLRepository) get(
ctx context.Context, orgId string, provider string, id string,
) (*integrationtypes.CloudIntegration, error) {
var result integrationtypes.CloudIntegration
) (*types.CloudIntegration, *model.ApiError) {
var result types.CloudIntegration
err := r.store.BunDB().NewSelect().
Model(&result).
@@ -83,25 +82,23 @@ func (r *cloudProviderAccountsSQLRepository) Get(
Where("id = ?", id).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.WrapNotFoundf(
err,
CodeCloudIntegrationAccountNotFound,
"couldn't find account with Id %s", id,
)
}
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud provider account")
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find account with Id %s", id,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud provider accounts: %w", err,
))
}
return &result, nil
}
func (r *cloudProviderAccountsSQLRepository) GetConnectedCloudAccount(
func (r *cloudProviderAccountsSQLRepository) getConnectedCloudAccount(
ctx context.Context, orgId string, provider string, accountId string,
) (*integrationtypes.CloudIntegration, error) {
var result integrationtypes.CloudIntegration
) (*types.CloudIntegration, *model.ApiError) {
var result types.CloudIntegration
err := r.store.BunDB().NewSelect().
Model(&result).
@@ -112,25 +109,29 @@ func (r *cloudProviderAccountsSQLRepository) GetConnectedCloudAccount(
Where("removed_at is NULL").
Scan(ctx)
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.WrapNotFoundf(err, CodeCloudIntegrationAccountNotFound, "couldn't find connected cloud account %s", accountId)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find connected cloud account %s", accountId,
))
} else if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud provider account")
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud provider accounts: %w", err,
))
}
return &result, nil
}
func (r *cloudProviderAccountsSQLRepository) Upsert(
func (r *cloudProviderAccountsSQLRepository) upsert(
ctx context.Context,
orgId string,
provider string,
id *string,
config []byte,
config *types.AccountConfig,
accountId *string,
agentReport *integrationtypes.AgentReport,
agentReport *types.AgentReport,
removedAt *time.Time,
) (*integrationtypes.CloudIntegration, error) {
) (*types.CloudIntegration, *model.ApiError) {
// Insert
if id == nil {
temp := valuer.GenerateUUID().StringValue()
@@ -180,7 +181,7 @@ func (r *cloudProviderAccountsSQLRepository) Upsert(
)
}
integration := integrationtypes.CloudIntegration{
integration := types.CloudIntegration{
OrgID: orgId,
Provider: provider,
Identifiable: types.Identifiable{ID: valuer.MustNewUUID(*id)},
@@ -188,25 +189,28 @@ func (r *cloudProviderAccountsSQLRepository) Upsert(
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
Config: string(config),
Config: config,
AccountID: accountId,
LastAgentReport: agentReport,
RemovedAt: removedAt,
}
_, err := r.store.BunDB().NewInsert().
_, dbErr := r.store.BunDB().NewInsert().
Model(&integration).
On(onConflictClause).
Exec(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't upsert cloud integration account")
if dbErr != nil {
return nil, model.InternalError(fmt.Errorf(
"could not upsert cloud account record: %w", dbErr,
))
}
upsertedAccount, err := r.Get(ctx, orgId, provider, *id)
if err != nil {
slog.ErrorContext(ctx, "error upserting cloud integration account", "error", err)
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't get upserted cloud integration account")
upsertedAccount, apiErr := r.get(ctx, orgId, provider, *id)
if apiErr != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't fetch upserted account by id: %w", apiErr.ToError(),
))
}
return upsertedAccount, nil

View File

@@ -1,571 +0,0 @@
package baseprovider
import (
"context"
"fmt"
"log/slog"
"sort"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
CodeDashboardNotFound = errors.MustNewCode("dashboard_not_found")
)
// hasValidTimeSeriesData checks if a query response contains valid time series data
// with at least one aggregation, series, and value
func hasValidTimeSeriesData(queryResponse *qbtypes.TimeSeriesData) bool {
return queryResponse != nil &&
len(queryResponse.Aggregations) > 0 &&
len(queryResponse.Aggregations[0].Series) > 0 &&
len(queryResponse.Aggregations[0].Series[0].Values) > 0
}
type BaseCloudProvider[def integrationtypes.Definition, conf integrationtypes.ServiceConfigTyped[def]] struct {
Logger *slog.Logger
Querier querier.Querier
AccountsRepo store.CloudProviderAccountsRepository
ServiceConfigRepo store.ServiceConfigDatabase
ServiceDefinitions *services.ServicesProvider[def]
ProviderType integrationtypes.CloudProviderType
}
func (b *BaseCloudProvider[def, conf]) GetName() integrationtypes.CloudProviderType {
return b.ProviderType
}
// AgentCheckIn is a helper function that handles common agent check-in logic.
// The getAgentConfigFunc should return the provider-specific agent configuration.
func AgentCheckIn[def integrationtypes.Definition, conf integrationtypes.ServiceConfigTyped[def], AgentConfigT any](
b *BaseCloudProvider[def, conf],
ctx context.Context,
req *integrationtypes.PostableAgentCheckInPayload,
getAgentConfigFunc func(context.Context, *integrationtypes.CloudIntegration) (*AgentConfigT, error),
) (*integrationtypes.GettableAgentCheckInRes[AgentConfigT], error) {
// agent can't check in unless the account is already created
existingAccount, err := b.AccountsRepo.Get(ctx, req.OrgID, b.GetName().String(), req.ID)
if err != nil {
return nil, err
}
if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
"can't check in with new %s account id %s for account %s with existing %s id %s",
b.GetName().String(), req.AccountID, existingAccount.ID.StringValue(), b.GetName().String(),
*existingAccount.AccountID,
)
}
existingAccount, err = b.AccountsRepo.GetConnectedCloudAccount(ctx, req.OrgID, b.GetName().String(), req.AccountID)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
if existingAccount != nil && existingAccount.ID.StringValue() != req.ID {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
"can't check in to %s account %s with id %s. already connected with id %s",
b.GetName().String(), req.AccountID, req.ID, existingAccount.ID.StringValue(),
)
}
agentReport := integrationtypes.AgentReport{
TimestampMillis: time.Now().UnixMilli(),
Data: req.Data,
}
account, err := b.AccountsRepo.Upsert(
ctx, req.OrgID, b.GetName().String(), &req.ID, nil, &req.AccountID, &agentReport, nil,
)
if err != nil {
return nil, err
}
agentConfig, err := getAgentConfigFunc(ctx, account)
if err != nil {
return nil, err
}
return &integrationtypes.GettableAgentCheckInRes[AgentConfigT]{
AccountId: account.ID.StringValue(),
CloudAccountId: *account.AccountID,
RemovedAt: account.RemovedAt,
IntegrationConfig: *agentConfig,
}, nil
}
func (b *BaseCloudProvider[def, conf]) GetAccountStatus(ctx context.Context, orgID, accountID string) (*integrationtypes.GettableAccountStatus, error) {
accountRecord, err := b.AccountsRepo.Get(ctx, orgID, b.ProviderType.String(), accountID)
if err != nil {
return nil, err
}
return &integrationtypes.GettableAccountStatus{
Id: accountRecord.ID.String(),
CloudAccountId: accountRecord.AccountID,
Status: accountRecord.Status(),
}, nil
}
func (b *BaseCloudProvider[def, conf]) ListConnectedAccounts(ctx context.Context, orgID string) (*integrationtypes.GettableConnectedAccountsList, error) {
accountRecords, err := b.AccountsRepo.ListConnected(ctx, orgID, b.ProviderType.String())
if err != nil {
return nil, err
}
connectedAccounts := make([]*integrationtypes.Account, 0, len(accountRecords))
for _, r := range accountRecords {
connectedAccounts = append(connectedAccounts, r.Account(b.ProviderType))
}
return &integrationtypes.GettableConnectedAccountsList{
Accounts: connectedAccounts,
}, nil
}
func (b *BaseCloudProvider[def, conf]) DisconnectAccount(ctx context.Context, orgID, accountID string) (*integrationtypes.CloudIntegration, error) {
account, err := b.AccountsRepo.Get(ctx, orgID, b.ProviderType.String(), accountID)
if err != nil {
return nil, err
}
tsNow := time.Now()
account, err = b.AccountsRepo.Upsert(
ctx, orgID, b.ProviderType.String(), &accountID, nil, nil, nil, &tsNow,
)
if err != nil {
return nil, err
}
return account, nil
}
func (b *BaseCloudProvider[def, conf]) GetDashboard(ctx context.Context, id string, orgID valuer.UUID) (*dashboardtypes.Dashboard, error) {
allDashboards, err := b.GetAvailableDashboards(ctx, orgID)
if err != nil {
return nil, err
}
for _, d := range allDashboards {
if d.ID == id {
return d, nil
}
}
return nil, errors.NewNotFoundf(CodeDashboardNotFound, "dashboard with id %s not found", id)
}
func (b *BaseCloudProvider[def, conf]) GetServiceConnectionStatus(
ctx context.Context,
cloudAccountID string,
orgID valuer.UUID,
definition def,
isMetricsEnabled bool,
isLogsEnabled bool,
) (*integrationtypes.ServiceConnectionStatus, error) {
ingestionStatusCheck := definition.GetIngestionStatusCheck()
if ingestionStatusCheck == nil {
return nil, nil
}
resp := new(integrationtypes.ServiceConnectionStatus)
wg := sync.WaitGroup{}
if len(ingestionStatusCheck.Metrics) > 0 && isMetricsEnabled {
wg.Add(1)
go func() {
defer utils.RecoverPanic(func(err interface{}, stack []byte) {
b.Logger.ErrorContext(
ctx, "panic while getting service metrics connection status",
"service", definition.GetId(),
"error", err,
"stack", string(stack),
)
})
defer wg.Done()
status, _ := b.getServiceMetricsConnectionStatus(ctx, cloudAccountID, orgID, definition)
resp.Metrics = status
}()
}
if len(ingestionStatusCheck.Logs) > 0 && isLogsEnabled {
wg.Add(1)
go func() {
defer utils.RecoverPanic(func(err interface{}, stack []byte) {
b.Logger.ErrorContext(
ctx, "panic while getting service logs connection status",
"service", definition.GetId(),
"error", err,
"stack", string(stack),
)
})
defer wg.Done()
status, _ := b.getServiceLogsConnectionStatus(ctx, cloudAccountID, orgID, definition)
resp.Logs = status
}()
}
wg.Wait()
return resp, nil
}
func (b *BaseCloudProvider[def, conf]) getServiceMetricsConnectionStatus(
ctx context.Context,
cloudAccountID string,
orgID valuer.UUID,
definition def,
) ([]*integrationtypes.SignalConnectionStatus, error) {
ingestionStatusCheck := definition.GetIngestionStatusCheck()
if ingestionStatusCheck == nil || len(ingestionStatusCheck.Metrics) < 1 {
return nil, nil
}
statusResp := make([]*integrationtypes.SignalConnectionStatus, 0)
for _, metric := range ingestionStatusCheck.Metrics {
statusResp = append(statusResp, &integrationtypes.SignalConnectionStatus{
CategoryID: metric.Category,
CategoryDisplayName: metric.DisplayName,
})
}
for index, category := range ingestionStatusCheck.Metrics {
queries := make([]qbtypes.QueryEnvelope, 0)
for _, check := range category.Checks {
// TODO: make sure all the cloud providers send these two attributes
// or create map of provider specific filter expression
filterExpression := fmt.Sprintf(`cloud.provider="%s" AND cloud.account.id="%s"`, b.ProviderType.String(), cloudAccountID)
f := ""
for _, attribute := range check.Attributes {
f = fmt.Sprintf("%s %s", attribute.Name, attribute.Operator)
if attribute.Value != "" {
f = fmt.Sprintf("%s '%s'", f, attribute.Value)
}
filterExpression = fmt.Sprintf("%s AND %s", filterExpression, f)
}
queries = append(queries, qbtypes.QueryEnvelope{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: valuer.GenerateUUID().String(),
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{{
MetricName: check.Key,
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
}},
Filter: &qbtypes.Filter{
Expression: filterExpression,
},
},
})
}
resp, err := b.Querier.QueryRange(ctx, orgID, &qbtypes.QueryRangeRequest{
SchemaVersion: "v5",
Start: uint64(time.Now().Add(-time.Hour).UnixMilli()),
End: uint64(time.Now().UnixMilli()),
RequestType: qbtypes.RequestTypeScalar,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
})
if err != nil {
b.Logger.DebugContext(ctx,
"error querying for service metrics connection status",
"error", err,
"service", definition.GetId(),
)
continue
}
if resp != nil && len(resp.Data.Results) < 1 {
continue
}
queryResponse, ok := resp.Data.Results[0].(*qbtypes.TimeSeriesData)
if !ok {
b.Logger.ErrorContext(ctx, "unexpected query response type for service metrics connection status",
"service", definition.GetId(),
)
continue
}
if !hasValidTimeSeriesData(queryResponse) {
continue
}
statusResp[index] = &integrationtypes.SignalConnectionStatus{
CategoryID: category.Category,
CategoryDisplayName: category.DisplayName,
LastReceivedTsMillis: queryResponse.Aggregations[0].Series[0].Values[0].Timestamp,
LastReceivedFrom: fmt.Sprintf("signoz-%s-integration", b.ProviderType.String()),
}
}
return statusResp, nil
}
func (b *BaseCloudProvider[def, conf]) getServiceLogsConnectionStatus(
ctx context.Context,
cloudAccountID string,
orgID valuer.UUID,
definition def,
) ([]*integrationtypes.SignalConnectionStatus, error) {
ingestionStatusCheck := definition.GetIngestionStatusCheck()
if ingestionStatusCheck == nil || len(ingestionStatusCheck.Logs) < 1 {
return nil, nil
}
statusResp := make([]*integrationtypes.SignalConnectionStatus, 0)
for _, log := range ingestionStatusCheck.Logs {
statusResp = append(statusResp, &integrationtypes.SignalConnectionStatus{
CategoryID: log.Category,
CategoryDisplayName: log.DisplayName,
})
}
for index, category := range ingestionStatusCheck.Logs {
queries := make([]qbtypes.QueryEnvelope, 0)
for _, check := range category.Checks {
// TODO: make sure all the cloud providers provide required attributes for logs
// or create map of provider specific filter expression
filterExpression := fmt.Sprintf(`cloud.account.id="%s"`, cloudAccountID)
f := ""
for _, attribute := range check.Attributes {
f = fmt.Sprintf("%s %s", attribute.Name, attribute.Operator)
if attribute.Value != "" {
f = fmt.Sprintf("%s '%s'", f, attribute.Value)
}
filterExpression = fmt.Sprintf("%s AND %s", filterExpression, f)
}
queries = append(queries, qbtypes.QueryEnvelope{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Name: valuer.GenerateUUID().String(),
Signal: telemetrytypes.SignalLogs,
Aggregations: []qbtypes.LogAggregation{{
Expression: "count()",
}},
Filter: &qbtypes.Filter{
Expression: filterExpression,
},
Limit: 10,
Offset: 0,
},
})
}
resp, err := b.Querier.QueryRange(ctx, orgID, &qbtypes.QueryRangeRequest{
SchemaVersion: "v1",
Start: uint64(time.Now().Add(-time.Hour * 1).UnixMilli()),
End: uint64(time.Now().UnixMilli()),
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
})
if err != nil {
b.Logger.DebugContext(ctx,
"error querying for service logs connection status",
"error", err,
"service", definition.GetId(),
)
continue
}
if resp != nil && len(resp.Data.Results) < 1 {
continue
}
queryResponse, ok := resp.Data.Results[0].(*qbtypes.TimeSeriesData)
if !ok {
b.Logger.ErrorContext(ctx, "unexpected query response type for service logs connection status",
"service", definition.GetId(),
)
continue
}
if !hasValidTimeSeriesData(queryResponse) {
continue
}
statusResp[index] = &integrationtypes.SignalConnectionStatus{
CategoryID: category.Category,
CategoryDisplayName: category.DisplayName,
LastReceivedTsMillis: queryResponse.Aggregations[0].Series[0].Values[0].Timestamp,
LastReceivedFrom: fmt.Sprintf("signoz-%s-integration", b.ProviderType.String()),
}
}
return statusResp, nil
}
func (b *BaseCloudProvider[def, conf]) GetAvailableDashboards(
ctx context.Context,
orgID valuer.UUID,
) ([]*dashboardtypes.Dashboard, error) {
accountRecords, err := b.AccountsRepo.ListConnected(ctx, orgID.StringValue(), b.ProviderType.String())
if err != nil {
return nil, err
}
servicesWithAvailableMetrics := map[string]*time.Time{}
for _, ar := range accountRecords {
if ar.AccountID != nil {
configsBySvcId, err := b.ServiceConfigRepo.GetAllForAccount(ctx, orgID.StringValue(), ar.ID.StringValue())
if err != nil {
return nil, err
}
for svcId, config := range configsBySvcId {
var serviceConfig conf
err = integrationtypes.UnmarshalJSON(config, &serviceConfig)
if err != nil {
return nil, err
}
if serviceConfig.IsMetricsEnabled() {
servicesWithAvailableMetrics[svcId] = &ar.CreatedAt
}
}
}
}
svcDashboards := make([]*dashboardtypes.Dashboard, 0)
allServices, err := b.ServiceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to list %s service definitions", b.ProviderType.String())
}
// accumulate definitions in a fixed order to ensure same order of dashboards across runs
svcIds := make([]string, 0, len(allServices))
for id := range allServices {
svcIds = append(svcIds, id)
}
sort.Strings(svcIds)
for _, svcId := range svcIds {
svc := allServices[svcId]
serviceDashboardsCreatedAt, ok := servicesWithAvailableMetrics[svcId]
if ok && serviceDashboardsCreatedAt != nil {
svcDashboards = append(
svcDashboards,
integrationtypes.GetDashboardsFromAssets(svc.GetId(), orgID, b.ProviderType, serviceDashboardsCreatedAt, svc.GetAssets())...,
)
servicesWithAvailableMetrics[svcId] = nil
}
}
return svcDashboards, nil
}
func (b *BaseCloudProvider[def, conf]) GetServiceConfig(
ctx context.Context,
definition def,
orgID valuer.UUID,
serviceId,
cloudAccountId string,
) (conf, error) {
var zero conf
activeAccount, err := b.AccountsRepo.GetConnectedCloudAccount(ctx, orgID.String(), b.ProviderType.String(), cloudAccountId)
if err != nil {
return zero, err
}
config, err := b.ServiceConfigRepo.Get(ctx, orgID.String(), activeAccount.ID.StringValue(), serviceId)
if err != nil {
if errors.Ast(err, errors.TypeNotFound) {
return zero, nil
}
return zero, err
}
var serviceConfig conf
err = integrationtypes.UnmarshalJSON(config, &serviceConfig)
if err != nil {
return zero, err
}
if config != nil && serviceConfig.IsMetricsEnabled() {
definition.PopulateDashboardURLs(b.ProviderType, serviceId)
}
return serviceConfig, nil
}
func (b *BaseCloudProvider[def, conf]) UpdateServiceConfig(ctx context.Context, serviceId string, orgID valuer.UUID, config []byte) (any, error) {
definition, err := b.ServiceDefinitions.GetServiceDefinition(ctx, serviceId)
if err != nil {
return nil, err
}
var updateReq integrationtypes.UpdatableServiceConfig[conf]
err = integrationtypes.UnmarshalJSON(config, &updateReq)
if err != nil {
return nil, err
}
// Check if config is provided (use any type assertion for nil check with generics)
if any(updateReq.Config) == nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "config is required")
}
if err = updateReq.Config.Validate(definition); err != nil {
return nil, err
}
// can only update config for a connected cloud account id
_, err = b.AccountsRepo.GetConnectedCloudAccount(
ctx, orgID.String(), b.GetName().String(), updateReq.CloudAccountId,
)
if err != nil {
return nil, err
}
serviceConfigBytes, err := integrationtypes.MarshalJSON(&updateReq.Config)
if err != nil {
return nil, err
}
updatedConfigBytes, err := b.ServiceConfigRepo.Upsert(
ctx, orgID.String(), b.GetName().String(), updateReq.CloudAccountId, serviceId, serviceConfigBytes,
)
if err != nil {
return nil, err
}
var updatedConfig conf
err = integrationtypes.UnmarshalJSON(updatedConfigBytes, &updatedConfig)
if err != nil {
return nil, err
}
return &integrationtypes.UpdatableServiceConfigRes{
ServiceId: serviceId,
Config: updatedConfig,
}, nil
}

View File

@@ -0,0 +1,43 @@
package cloudintegrations
import (
"github.com/SigNoz/signoz/pkg/errors"
)
var (
CodeInvalidCloudRegion = errors.MustNewCode("invalid_cloud_region")
CodeMismatchCloudProvider = errors.MustNewCode("cloud_provider_mismatch")
)
// List of all valid cloud regions on Amazon Web Services
var ValidAWSRegions = map[string]bool{
"af-south-1": true, // Africa (Cape Town).
"ap-east-1": true, // Asia Pacific (Hong Kong).
"ap-northeast-1": true, // Asia Pacific (Tokyo).
"ap-northeast-2": true, // Asia Pacific (Seoul).
"ap-northeast-3": true, // Asia Pacific (Osaka).
"ap-south-1": true, // Asia Pacific (Mumbai).
"ap-south-2": true, // Asia Pacific (Hyderabad).
"ap-southeast-1": true, // Asia Pacific (Singapore).
"ap-southeast-2": true, // Asia Pacific (Sydney).
"ap-southeast-3": true, // Asia Pacific (Jakarta).
"ap-southeast-4": true, // Asia Pacific (Melbourne).
"ca-central-1": true, // Canada (Central).
"ca-west-1": true, // Canada West (Calgary).
"eu-central-1": true, // Europe (Frankfurt).
"eu-central-2": true, // Europe (Zurich).
"eu-north-1": true, // Europe (Stockholm).
"eu-south-1": true, // Europe (Milan).
"eu-south-2": true, // Europe (Spain).
"eu-west-1": true, // Europe (Ireland).
"eu-west-2": true, // Europe (London).
"eu-west-3": true, // Europe (Paris).
"il-central-1": true, // Israel (Tel Aviv).
"me-central-1": true, // Middle East (UAE).
"me-south-1": true, // Middle East (Bahrain).
"sa-east-1": true, // South America (Sao Paulo).
"us-east-1": true, // US East (N. Virginia).
"us-east-2": true, // US East (Ohio).
"us-west-1": true, // US West (N. California).
"us-west-2": true, // US West (Oregon).
}

View File

@@ -0,0 +1,624 @@
package cloudintegrations
import (
"context"
"fmt"
"net/url"
"slices"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"golang.org/x/exp/maps"
)
var SupportedCloudProviders = []string{
"aws",
}
func validateCloudProviderName(name string) *model.ApiError {
if !slices.Contains(SupportedCloudProviders, name) {
return model.BadRequest(fmt.Errorf("invalid cloud provider: %s", name))
}
return nil
}
type Controller struct {
accountsRepo cloudProviderAccountsRepository
serviceConfigRepo ServiceConfigDatabase
}
func NewController(sqlStore sqlstore.SQLStore) (*Controller, error) {
accountsRepo, err := newCloudProviderAccountsRepository(sqlStore)
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider accounts repo: %w", err)
}
serviceConfigRepo, err := newServiceConfigRepository(sqlStore)
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider service config repo: %w", err)
}
return &Controller{
accountsRepo: accountsRepo,
serviceConfigRepo: serviceConfigRepo,
}, nil
}
type ConnectedAccountsListResponse struct {
Accounts []types.Account `json:"accounts"`
}
func (c *Controller) ListConnectedAccounts(ctx context.Context, orgId string, cloudProvider string) (
*ConnectedAccountsListResponse, *model.ApiError,
) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
accountRecords, apiErr := c.accountsRepo.listConnected(ctx, orgId, cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud accounts")
}
connectedAccounts := []types.Account{}
for _, a := range accountRecords {
connectedAccounts = append(connectedAccounts, a.Account())
}
return &ConnectedAccountsListResponse{
Accounts: connectedAccounts,
}, nil
}
type GenerateConnectionUrlRequest struct {
// Optional. To be specified for updates.
AccountId *string `json:"account_id,omitempty"`
AccountConfig types.AccountConfig `json:"account_config"`
AgentConfig SigNozAgentConfig `json:"agent_config"`
}
type SigNozAgentConfig struct {
// The region in which SigNoz agent should be installed.
Region string `json:"region"`
IngestionUrl string `json:"ingestion_url"`
IngestionKey string `json:"ingestion_key"`
SigNozAPIUrl string `json:"signoz_api_url"`
SigNozAPIKey string `json:"signoz_api_key"`
Version string `json:"version,omitempty"`
}
type GenerateConnectionUrlResponse struct {
AccountId string `json:"account_id"`
ConnectionUrl string `json:"connection_url"`
}
func (c *Controller) GenerateConnectionUrl(ctx context.Context, orgId string, cloudProvider string, req GenerateConnectionUrlRequest) (*GenerateConnectionUrlResponse, *model.ApiError) {
// Account connection with a simple connection URL may not be available for all providers.
if cloudProvider != "aws" {
return nil, model.BadRequest(fmt.Errorf("unsupported cloud provider: %s", cloudProvider))
}
account, apiErr := c.accountsRepo.upsert(
ctx, orgId, cloudProvider, req.AccountId, &req.AccountConfig, nil, nil, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
agentVersion := "v0.0.8"
if req.AgentConfig.Version != "" {
agentVersion = req.AgentConfig.Version
}
connectionUrl := fmt.Sprintf(
"https://%s.console.aws.amazon.com/cloudformation/home?region=%s#/stacks/quickcreate?",
req.AgentConfig.Region, req.AgentConfig.Region,
)
for qp, value := range map[string]string{
"param_SigNozIntegrationAgentVersion": agentVersion,
"param_SigNozApiUrl": req.AgentConfig.SigNozAPIUrl,
"param_SigNozApiKey": req.AgentConfig.SigNozAPIKey,
"param_SigNozAccountId": account.ID.StringValue(),
"param_IngestionUrl": req.AgentConfig.IngestionUrl,
"param_IngestionKey": req.AgentConfig.IngestionKey,
"stackName": "signoz-integration",
"templateURL": fmt.Sprintf(
"https://signoz-integrations.s3.us-east-1.amazonaws.com/aws-quickcreate-template-%s.json",
agentVersion,
),
} {
connectionUrl += fmt.Sprintf("&%s=%s", qp, url.QueryEscape(value))
}
return &GenerateConnectionUrlResponse{
AccountId: account.ID.StringValue(),
ConnectionUrl: connectionUrl,
}, nil
}
type AccountStatusResponse struct {
Id string `json:"id"`
CloudAccountId *string `json:"cloud_account_id,omitempty"`
Status types.AccountStatus `json:"status"`
}
func (c *Controller) GetAccountStatus(ctx context.Context, orgId string, cloudProvider string, accountId string) (
*AccountStatusResponse, *model.ApiError,
) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
account, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, accountId)
if apiErr != nil {
return nil, apiErr
}
resp := AccountStatusResponse{
Id: account.ID.StringValue(),
CloudAccountId: account.AccountID,
Status: account.Status(),
}
return &resp, nil
}
type AgentCheckInRequest struct {
ID string `json:"account_id"`
AccountID string `json:"cloud_account_id"`
// Arbitrary cloud specific Agent data
Data map[string]any `json:"data,omitempty"`
}
type AgentCheckInResponse struct {
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`
IntegrationConfig IntegrationConfigForAgent `json:"integration_config"`
}
type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"`
TelemetryCollectionStrategy *CompiledCollectionStrategy `json:"telemetry,omitempty"`
}
func (c *Controller) CheckInAsAgent(ctx context.Context, orgId string, cloudProvider string, req AgentCheckInRequest) (*AgentCheckInResponse, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
existingAccount, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, req.ID)
if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID {
return nil, model.BadRequest(fmt.Errorf(
"can't check in with new %s account id %s for account %s with existing %s id %s",
cloudProvider, req.AccountID, existingAccount.ID.StringValue(), cloudProvider, *existingAccount.AccountID,
))
}
existingAccount, apiErr = c.accountsRepo.getConnectedCloudAccount(ctx, orgId, cloudProvider, req.AccountID)
if existingAccount != nil && existingAccount.ID.StringValue() != req.ID {
return nil, model.BadRequest(fmt.Errorf(
"can't check in to %s account %s with id %s. already connected with id %s",
cloudProvider, req.AccountID, req.ID, existingAccount.ID.StringValue(),
))
}
agentReport := types.AgentReport{
TimestampMillis: time.Now().UnixMilli(),
Data: req.Data,
}
account, apiErr := c.accountsRepo.upsert(
ctx, orgId, cloudProvider, &req.ID, nil, &req.AccountID, &agentReport, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
// prepare and return integration config to be consumed by agent
compiledStrategy, err := NewCompiledCollectionStrategy(cloudProvider)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't init telemetry collection strategy: %w", err,
))
}
agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{},
TelemetryCollectionStrategy: compiledStrategy,
}
if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions
}
services, err := services.Map(cloudProvider)
if err != nil {
return nil, err
}
svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, orgId, account.ID.StringValue(),
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}
// accumulate config in a fixed order to ensure same config generated across runs
configuredServices := maps.Keys(svcConfigs)
slices.Sort(configuredServices)
for _, svcType := range configuredServices {
definition, ok := services[svcType]
if !ok {
continue
}
config := svcConfigs[svcType]
err := AddServiceStrategy(svcType, compiledStrategy, definition.Strategy, config)
if err != nil {
return nil, err
}
}
return &AgentCheckInResponse{
AccountId: account.ID.StringValue(),
CloudAccountId: *account.AccountID,
RemovedAt: account.RemovedAt,
IntegrationConfig: agentConfig,
}, nil
}
type UpdateAccountConfigRequest struct {
Config types.AccountConfig `json:"config"`
}
func (c *Controller) UpdateAccountConfig(ctx context.Context, orgId string, cloudProvider string, accountId string, req UpdateAccountConfigRequest) (*types.Account, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
accountRecord, apiErr := c.accountsRepo.upsert(
ctx, orgId, cloudProvider, &accountId, &req.Config, nil, nil, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
account := accountRecord.Account()
return &account, nil
}
func (c *Controller) DisconnectAccount(ctx context.Context, orgId string, cloudProvider string, accountId string) (*types.CloudIntegration, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
account, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, accountId)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't disconnect account")
}
tsNow := time.Now()
account, apiErr = c.accountsRepo.upsert(
ctx, orgId, cloudProvider, &accountId, nil, nil, nil, &tsNow,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't disconnect account")
}
return account, nil
}
type ListServicesResponse struct {
Services []ServiceSummary `json:"services"`
}
func (c *Controller) ListServices(
ctx context.Context,
orgID string,
cloudProvider string,
cloudAccountId *string,
) (*ListServicesResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
definitions, apiErr := services.List(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcConfigs := map[string]*types.CloudServiceConfig{}
if cloudAccountId != nil {
activeAccount, apiErr := c.accountsRepo.getConnectedCloudAccount(
ctx, orgID, cloudProvider, *cloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't get active account")
}
svcConfigs, apiErr = c.serviceConfigRepo.getAllForAccount(
ctx, orgID, activeAccount.ID.StringValue(),
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}
}
summaries := []ServiceSummary{}
for _, def := range definitions {
summary := ServiceSummary{
Metadata: def.Metadata,
}
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
}
return &ListServicesResponse{
Services: summaries,
}, nil
}
func (c *Controller) GetServiceDetails(
ctx context.Context,
orgID string,
cloudProvider string,
serviceId string,
cloudAccountId *string,
) (*ServiceDetails, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
definition, err := services.GetServiceDefinition(cloudProvider, serviceId)
if err != nil {
return nil, err
}
details := ServiceDetails{
Definition: *definition,
}
if cloudAccountId != nil {
activeAccount, apiErr := c.accountsRepo.getConnectedCloudAccount(
ctx, orgID, cloudProvider, *cloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't get active account")
}
config, apiErr := c.serviceConfigRepo.get(
ctx, orgID, activeAccount.ID.StringValue(), serviceId,
)
if apiErr != nil && apiErr.Type() != model.ErrorNotFound {
return nil, model.WrapApiError(apiErr, "couldn't fetch service config")
}
if config != nil {
details.Config = config
enabled := false
if config.Metrics != nil && config.Metrics.Enabled {
enabled = true
}
// add links to service dashboards, making them clickable.
for i, d := range definition.Assets.Dashboards {
dashboardUuid := c.dashboardUuid(
cloudProvider, serviceId, d.Id,
)
if enabled {
definition.Assets.Dashboards[i].Url = fmt.Sprintf("/dashboard/%s", dashboardUuid)
} else {
definition.Assets.Dashboards[i].Url = "" // to unset the in-memory URL if enabled once and disabled afterwards
}
}
}
}
return &details, nil
}
type UpdateServiceConfigRequest struct {
CloudAccountId string `json:"cloud_account_id"`
Config types.CloudServiceConfig `json:"config"`
}
func (u *UpdateServiceConfigRequest) Validate(def *services.Definition) error {
if def.Id != services.S3Sync && u.Config.Logs != nil && u.Config.Logs.S3Buckets != nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "s3 buckets can only be added to service-type[%s]", services.S3Sync)
} else if def.Id == services.S3Sync && u.Config.Logs != nil && u.Config.Logs.S3Buckets != nil {
for region := range u.Config.Logs.S3Buckets {
if _, found := ValidAWSRegions[region]; !found {
return errors.NewInvalidInputf(CodeInvalidCloudRegion, "invalid cloud region: %s", region)
}
}
}
return nil
}
type UpdateServiceConfigResponse struct {
Id string `json:"id"`
Config types.CloudServiceConfig `json:"config"`
}
func (c *Controller) UpdateServiceConfig(
ctx context.Context,
orgID string,
cloudProvider string,
serviceType string,
req *UpdateServiceConfigRequest,
) (*UpdateServiceConfigResponse, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
// can only update config for a valid service.
definition, err := services.GetServiceDefinition(cloudProvider, serviceType)
if err != nil {
return nil, err
}
if err := req.Validate(definition); err != nil {
return nil, err
}
// can only update config for a connected cloud account id
_, apiErr := c.accountsRepo.getConnectedCloudAccount(
ctx, orgID, cloudProvider, req.CloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't find connected cloud account")
}
updatedConfig, apiErr := c.serviceConfigRepo.upsert(
ctx, orgID, cloudProvider, req.CloudAccountId, serviceType, req.Config,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't update service config")
}
return &UpdateServiceConfigResponse{
Id: serviceType,
Config: *updatedConfig,
}, nil
}
// All dashboards that are available based on cloud integrations configuration
// across all cloud providers
func (c *Controller) AvailableDashboards(ctx context.Context, orgId valuer.UUID) ([]*dashboardtypes.Dashboard, *model.ApiError) {
allDashboards := []*dashboardtypes.Dashboard{}
for _, provider := range []string{"aws"} {
providerDashboards, apiErr := c.AvailableDashboardsForCloudProvider(ctx, orgId, provider)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, fmt.Sprintf("couldn't get available dashboards for %s", provider),
)
}
allDashboards = append(allDashboards, providerDashboards...)
}
return allDashboards, nil
}
func (c *Controller) AvailableDashboardsForCloudProvider(ctx context.Context, orgID valuer.UUID, cloudProvider string) ([]*dashboardtypes.Dashboard, *model.ApiError) {
accountRecords, apiErr := c.accountsRepo.listConnected(ctx, orgID.StringValue(), cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list connected cloud accounts")
}
// for v0, service dashboards are only available when metrics are enabled.
servicesWithAvailableMetrics := map[string]*time.Time{}
for _, ar := range accountRecords {
if ar.AccountID != nil {
configsBySvcId, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, orgID.StringValue(), ar.ID.StringValue(),
)
if apiErr != nil {
return nil, apiErr
}
for svcId, config := range configsBySvcId {
if config.Metrics != nil && config.Metrics.Enabled {
servicesWithAvailableMetrics[svcId] = &ar.CreatedAt
}
}
}
}
allServices, apiErr := services.List(cloudProvider)
if apiErr != nil {
return nil, apiErr
}
svcDashboards := []*dashboardtypes.Dashboard{}
for _, svc := range allServices {
serviceDashboardsCreatedAt := servicesWithAvailableMetrics[svc.Id]
if serviceDashboardsCreatedAt != nil {
for _, d := range svc.Assets.Dashboards {
author := fmt.Sprintf("%s-integration", cloudProvider)
svcDashboards = append(svcDashboards, &dashboardtypes.Dashboard{
ID: c.dashboardUuid(cloudProvider, svc.Id, d.Id),
Locked: true,
Data: *d.Definition,
TimeAuditable: types.TimeAuditable{
CreatedAt: *serviceDashboardsCreatedAt,
UpdatedAt: *serviceDashboardsCreatedAt,
},
UserAuditable: types.UserAuditable{
CreatedBy: author,
UpdatedBy: author,
},
OrgID: orgID,
})
}
servicesWithAvailableMetrics[svc.Id] = nil
}
}
return svcDashboards, nil
}
func (c *Controller) GetDashboardById(ctx context.Context, orgId valuer.UUID, dashboardUuid string) (*dashboardtypes.Dashboard, *model.ApiError) {
cloudProvider, _, _, apiErr := c.parseDashboardUuid(dashboardUuid)
if apiErr != nil {
return nil, apiErr
}
allDashboards, apiErr := c.AvailableDashboardsForCloudProvider(ctx, orgId, cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list available dashboards")
}
for _, d := range allDashboards {
if d.ID == dashboardUuid {
return d, nil
}
}
return nil, model.NotFoundError(fmt.Errorf("couldn't find dashboard with uuid: %s", dashboardUuid))
}
func (c *Controller) dashboardUuid(
cloudProvider string, svcId string, dashboardId string,
) string {
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcId, dashboardId)
}
func (c *Controller) parseDashboardUuid(dashboardUuid string) (cloudProvider string, svcId string, dashboardId string, apiErr *model.ApiError) {
parts := strings.SplitN(dashboardUuid, "--", 4)
if len(parts) != 4 || parts[0] != "cloud-integration" {
return "", "", "", model.BadRequest(fmt.Errorf("invalid cloud integration dashboard id"))
}
return parts[1], parts[2], parts[3], nil
}
func (c *Controller) IsCloudIntegrationDashboardUuid(dashboardUuid string) bool {
_, _, _, apiErr := c.parseDashboardUuid(dashboardUuid)
return apiErr == nil
}

View File

@@ -1,331 +0,0 @@
package implawsprovider
import (
"context"
"fmt"
"log/slog"
"net/url"
"slices"
"golang.org/x/exp/maps"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/baseprovider"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
integrationstore "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
CodeInvalidAWSRegion = errors.MustNewCode("invalid_aws_region")
)
type awsProvider struct {
baseprovider.BaseCloudProvider[*integrationtypes.AWSDefinition, *integrationtypes.AWSServiceConfig]
}
func NewAWSCloudProvider(
logger *slog.Logger,
accountsRepo integrationstore.CloudProviderAccountsRepository,
serviceConfigRepo integrationstore.ServiceConfigDatabase,
querier querier.Querier,
) (integrationtypes.CloudProvider, error) {
serviceDefinitions, err := services.NewAWSCloudProviderServices()
if err != nil {
return nil, err
}
return &awsProvider{
BaseCloudProvider: baseprovider.BaseCloudProvider[*integrationtypes.AWSDefinition, *integrationtypes.AWSServiceConfig]{
Logger: logger,
Querier: querier,
AccountsRepo: accountsRepo,
ServiceConfigRepo: serviceConfigRepo,
ServiceDefinitions: serviceDefinitions,
ProviderType: integrationtypes.CloudProviderAWS,
},
}, nil
}
func (a *awsProvider) AgentCheckIn(ctx context.Context, req *integrationtypes.PostableAgentCheckInPayload) (any, error) {
return baseprovider.AgentCheckIn(
&a.BaseCloudProvider,
ctx,
req,
a.getAWSAgentConfig,
)
}
func (a *awsProvider) getAWSAgentConfig(ctx context.Context, account *integrationtypes.CloudIntegration) (*integrationtypes.AWSAgentIntegrationConfig, error) {
// prepare and return integration config to be consumed by agent
agentConfig := &integrationtypes.AWSAgentIntegrationConfig{
EnabledRegions: []string{},
TelemetryCollectionStrategy: &integrationtypes.AWSCollectionStrategy{
Metrics: &integrationtypes.AWSMetricsStrategy{},
Logs: &integrationtypes.AWSLogsStrategy{},
S3Buckets: map[string][]string{},
},
}
accountConfig := new(integrationtypes.AWSAccountConfig)
err := integrationtypes.UnmarshalJSON([]byte(account.Config), accountConfig)
if err != nil {
return nil, err
}
if accountConfig.EnabledRegions != nil {
agentConfig.EnabledRegions = accountConfig.EnabledRegions
}
svcConfigs, err := a.ServiceConfigRepo.GetAllForAccount(
ctx, account.OrgID, account.ID.StringValue(),
)
if err != nil {
return nil, err
}
// accumulate config in a fixed order to ensure same config generated across runs
configuredServices := maps.Keys(svcConfigs)
slices.Sort(configuredServices)
for _, svcType := range configuredServices {
definition, err := a.ServiceDefinitions.GetServiceDefinition(ctx, svcType)
if err != nil {
continue
}
config := svcConfigs[svcType]
serviceConfig := new(integrationtypes.AWSServiceConfig)
err = integrationtypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
continue
}
if serviceConfig.IsLogsEnabled() {
if svcType == integrationtypes.S3Sync {
// S3 bucket sync; No cloudwatch logs are appended for this service type;
// Though definition is populated with a custom cloudwatch group that helps in calculating logs connection status
agentConfig.TelemetryCollectionStrategy.S3Buckets = serviceConfig.Logs.S3Buckets
} else if definition.Strategy.Logs != nil { // services that includes a logs subscription
agentConfig.TelemetryCollectionStrategy.Logs.Subscriptions = append(
agentConfig.TelemetryCollectionStrategy.Logs.Subscriptions,
definition.Strategy.Logs.Subscriptions...,
)
}
}
if serviceConfig.IsMetricsEnabled() && definition.Strategy.Metrics != nil {
agentConfig.TelemetryCollectionStrategy.Metrics.StreamFilters = append(
agentConfig.TelemetryCollectionStrategy.Metrics.StreamFilters,
definition.Strategy.Metrics.StreamFilters...,
)
}
}
return agentConfig, nil
}
func (a *awsProvider) ListServices(ctx context.Context, orgID string, cloudAccountID *string) (any, error) {
svcConfigs := make(map[string]*integrationtypes.AWSServiceConfig)
if cloudAccountID != nil {
activeAccount, err := a.AccountsRepo.GetConnectedCloudAccount(ctx, orgID, a.GetName().String(), *cloudAccountID)
if err != nil {
return nil, err
}
serviceConfigs, err := a.ServiceConfigRepo.GetAllForAccount(ctx, orgID, activeAccount.ID.String())
if err != nil {
return nil, err
}
for svcType, config := range serviceConfigs {
serviceConfig := new(integrationtypes.AWSServiceConfig)
err = integrationtypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
return nil, err
}
svcConfigs[svcType] = serviceConfig
}
}
summaries := make([]integrationtypes.AWSServiceSummary, 0)
definitions, err := a.ServiceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, err
}
for _, def := range definitions {
summary := integrationtypes.AWSServiceSummary{
DefinitionMetadata: def.DefinitionMetadata,
Config: nil,
}
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
}
slices.SortFunc(summaries, func(a, b integrationtypes.AWSServiceSummary) int {
if a.DefinitionMetadata.Title < b.DefinitionMetadata.Title {
return -1
}
if a.DefinitionMetadata.Title > b.DefinitionMetadata.Title {
return 1
}
return 0
})
return &integrationtypes.GettableAWSServices{
Services: summaries,
}, nil
}
func (a *awsProvider) GetServiceDetails(ctx context.Context, req *integrationtypes.GetServiceDetailsReq) (any, error) {
details := new(integrationtypes.GettableAWSServiceDetails)
awsDefinition, err := a.ServiceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
if err != nil {
return nil, err
}
details.Definition = *awsDefinition
if req.CloudAccountID == nil {
return details, nil
}
config, err := a.GetServiceConfig(ctx, awsDefinition, req.OrgID, req.ServiceId, *req.CloudAccountID)
if err != nil {
return nil, err
}
if config == nil {
return details, nil
}
details.Config = config
isMetricsEnabled := config.IsMetricsEnabled()
isLogsEnabled := config.IsLogsEnabled()
connectionStatus, err := a.GetServiceConnectionStatus(
ctx,
*req.CloudAccountID,
req.OrgID,
awsDefinition,
isMetricsEnabled,
isLogsEnabled,
)
if err != nil {
return nil, err
}
details.ConnectionStatus = connectionStatus
return details, nil
}
func (a *awsProvider) GenerateConnectionArtifact(ctx context.Context, req *integrationtypes.PostableConnectionArtifact) (any, error) {
connection := new(integrationtypes.PostableAWSConnectionUrl)
err := integrationtypes.UnmarshalJSON(req.Data, connection)
if err != nil {
return nil, err
}
if connection.AccountConfig != nil {
for _, region := range connection.AccountConfig.EnabledRegions {
if integrationtypes.ValidAWSRegions[region] {
continue
}
return nil, errors.NewInvalidInputf(CodeInvalidAWSRegion, "invalid aws region: %s", region)
}
}
config, err := integrationtypes.MarshalJSON(connection.AccountConfig)
if err != nil {
return nil, err
}
account, err := a.AccountsRepo.Upsert(
ctx, req.OrgID, integrationtypes.CloudProviderAWS.String(), nil, config,
nil, nil, nil,
)
if err != nil {
return nil, err
}
agentVersion := "v0.0.8"
if connection.AgentConfig.Version != "" {
agentVersion = connection.AgentConfig.Version
}
baseURL := fmt.Sprintf("https://%s.console.aws.amazon.com/cloudformation/home",
connection.AgentConfig.Region)
u, _ := url.Parse(baseURL)
q := u.Query()
q.Set("region", connection.AgentConfig.Region)
u.Fragment = "/stacks/quickcreate"
u.RawQuery = q.Encode()
q = u.Query()
q.Set("stackName", "signoz-integration")
q.Set("templateURL", fmt.Sprintf("https://signoz-integrations.s3.us-east-1.amazonaws.com/aws-quickcreate-template-%s.json", agentVersion))
q.Set("param_SigNozIntegrationAgentVersion", agentVersion)
q.Set("param_SigNozApiUrl", connection.AgentConfig.SigNozAPIUrl)
q.Set("param_SigNozApiKey", connection.AgentConfig.SigNozAPIKey)
q.Set("param_SigNozAccountId", account.ID.StringValue())
q.Set("param_IngestionUrl", connection.AgentConfig.IngestionUrl)
q.Set("param_IngestionKey", connection.AgentConfig.IngestionKey)
return &integrationtypes.GettableAWSConnectionUrl{
AccountId: account.ID.StringValue(),
ConnectionUrl: u.String() + "?&" + q.Encode(), // this format is required by AWS
}, nil
}
func (a *awsProvider) UpdateAccountConfig(ctx context.Context, orgId valuer.UUID, accountId string, configBytes []byte) (any, error) {
config := new(integrationtypes.UpdatableAWSAccountConfig)
err := integrationtypes.UnmarshalJSON(configBytes, config)
if err != nil {
return nil, err
}
if config.Config == nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "account config can't be null")
}
for _, region := range config.Config.EnabledRegions {
if integrationtypes.ValidAWSRegions[region] {
continue
}
return nil, errors.NewInvalidInputf(CodeInvalidAWSRegion, "invalid aws region: %s", region)
}
// account must exist to update config, but it doesn't need to be connected
_, err = a.AccountsRepo.Get(ctx, orgId.String(), a.GetName().String(), accountId)
if err != nil {
return nil, err
}
configBytes, err = integrationtypes.MarshalJSON(config.Config)
if err != nil {
return nil, err
}
accountRecord, err := a.AccountsRepo.Upsert(
ctx, orgId.String(), a.GetName().String(), &accountId, configBytes, nil, nil, nil,
)
if err != nil {
return nil, err
}
return accountRecord.Account(a.GetName()), nil
}

View File

@@ -1,368 +0,0 @@
package implazureprovider
import (
"context"
"fmt"
"log/slog"
"slices"
"strings"
"golang.org/x/exp/maps"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/baseprovider"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
CodeInvalidAzureRegion = errors.MustNewCode("invalid_azure_region")
)
type azureProvider struct {
baseprovider.BaseCloudProvider[*integrationtypes.AzureDefinition, *integrationtypes.AzureServiceConfig]
}
func NewAzureCloudProvider(
logger *slog.Logger,
accountsRepo store.CloudProviderAccountsRepository,
serviceConfigRepo store.ServiceConfigDatabase,
querier querier.Querier,
) (integrationtypes.CloudProvider, error) {
azureServiceDefinitions, err := services.NewAzureCloudProviderServices()
if err != nil {
return nil, err
}
return &azureProvider{
BaseCloudProvider: baseprovider.BaseCloudProvider[*integrationtypes.AzureDefinition, *integrationtypes.AzureServiceConfig]{
Logger: logger,
Querier: querier,
AccountsRepo: accountsRepo,
ServiceConfigRepo: serviceConfigRepo,
ServiceDefinitions: azureServiceDefinitions,
ProviderType: integrationtypes.CloudProviderAzure,
},
}, nil
}
func (a *azureProvider) AgentCheckIn(ctx context.Context, req *integrationtypes.PostableAgentCheckInPayload) (any, error) {
return baseprovider.AgentCheckIn(
&a.BaseCloudProvider,
ctx,
req,
a.getAzureAgentConfig,
)
}
func (a *azureProvider) getAzureAgentConfig(ctx context.Context, account *integrationtypes.CloudIntegration) (*integrationtypes.AzureAgentIntegrationConfig, error) {
// prepare and return integration config to be consumed by agent
agentConfig := &integrationtypes.AzureAgentIntegrationConfig{
TelemetryCollectionStrategy: make(map[string]*integrationtypes.AzureCollectionStrategy),
}
accountConfig := new(integrationtypes.AzureAccountConfig)
err := integrationtypes.UnmarshalJSON([]byte(account.Config), accountConfig)
if err != nil {
return nil, err
}
if account.Config != "" {
agentConfig.DeploymentRegion = accountConfig.DeploymentRegion
agentConfig.EnabledResourceGroups = accountConfig.EnabledResourceGroups
}
svcConfigs, err := a.ServiceConfigRepo.GetAllForAccount(
ctx, account.OrgID, account.ID.StringValue(),
)
if err != nil {
return nil, err
}
// accumulate config in a fixed order to ensure same config generated across runs
configuredServices := maps.Keys(svcConfigs)
slices.Sort(configuredServices)
for _, svcType := range configuredServices {
definition, err := a.ServiceDefinitions.GetServiceDefinition(ctx, svcType)
if err != nil {
continue
}
config := svcConfigs[svcType]
serviceConfig := new(integrationtypes.AzureServiceConfig)
err = integrationtypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
continue
}
metrics := make([]*integrationtypes.AzureMetricsStrategy, 0)
logs := make([]*integrationtypes.AzureLogsStrategy, 0)
metricsStrategyMap := make(map[string]*integrationtypes.AzureMetricsStrategy)
logsStrategyMap := make(map[string]*integrationtypes.AzureLogsStrategy)
if definition.Strategy != nil && definition.Strategy.Metrics != nil {
for _, metric := range definition.Strategy.Metrics {
metricsStrategyMap[metric.Name] = metric
}
}
if definition.Strategy != nil && definition.Strategy.Logs != nil {
for _, log := range definition.Strategy.Logs {
logsStrategyMap[log.Name] = log
}
}
if serviceConfig.Metrics != nil {
for _, metric := range serviceConfig.Metrics {
if metric.Enabled {
metrics = append(metrics, &integrationtypes.AzureMetricsStrategy{
CategoryType: metricsStrategyMap[metric.Name].CategoryType,
Name: metric.Name,
})
}
}
}
if serviceConfig.Logs != nil {
for _, log := range serviceConfig.Logs {
if log.Enabled {
logs = append(logs, &integrationtypes.AzureLogsStrategy{
CategoryType: logsStrategyMap[log.Name].CategoryType,
Name: log.Name,
})
}
}
}
strategy := &integrationtypes.AzureCollectionStrategy{
Metrics: metrics,
Logs: logs,
}
agentConfig.TelemetryCollectionStrategy[svcType] = strategy
}
return agentConfig, nil
}
func (a *azureProvider) ListServices(ctx context.Context, orgID string, cloudAccountID *string) (any, error) {
svcConfigs := make(map[string]*integrationtypes.AzureServiceConfig)
if cloudAccountID != nil {
activeAccount, err := a.AccountsRepo.GetConnectedCloudAccount(ctx, orgID, a.GetName().String(), *cloudAccountID)
if err != nil {
return nil, err
}
serviceConfigs, err := a.ServiceConfigRepo.GetAllForAccount(ctx, orgID, activeAccount.ID.StringValue())
if err != nil {
return nil, err
}
for svcType, config := range serviceConfigs {
serviceConfig := new(integrationtypes.AzureServiceConfig)
err = integrationtypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
return nil, err
}
svcConfigs[svcType] = serviceConfig
}
}
summaries := make([]integrationtypes.AzureServiceSummary, 0)
definitions, err := a.ServiceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, err
}
for _, def := range definitions {
summary := integrationtypes.AzureServiceSummary{
DefinitionMetadata: def.DefinitionMetadata,
Config: nil,
}
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
}
slices.SortFunc(summaries, func(a, b integrationtypes.AzureServiceSummary) int {
if a.DefinitionMetadata.Title < b.DefinitionMetadata.Title {
return -1
}
if a.DefinitionMetadata.Title > b.DefinitionMetadata.Title {
return 1
}
return 0
})
return &integrationtypes.GettableAzureServices{
Services: summaries,
}, nil
}
func (a *azureProvider) GetServiceDetails(ctx context.Context, req *integrationtypes.GetServiceDetailsReq) (any, error) {
details := new(integrationtypes.GettableAzureServiceDetails)
azureDefinition, err := a.ServiceDefinitions.GetServiceDefinition(ctx, req.ServiceId)
if err != nil {
return nil, err
}
details.Definition = *azureDefinition
if req.CloudAccountID == nil {
return details, nil
}
config, err := a.GetServiceConfig(ctx, azureDefinition, req.OrgID, req.ServiceId, *req.CloudAccountID)
if err != nil {
return nil, err
}
details.Config = config
// fill default values for config
if details.Config == nil {
cfg := new(integrationtypes.AzureServiceConfig)
logs := make([]*integrationtypes.AzureServiceLogsConfig, 0)
if azureDefinition.Strategy != nil && azureDefinition.Strategy.Logs != nil {
for _, log := range azureDefinition.Strategy.Logs {
logs = append(logs, &integrationtypes.AzureServiceLogsConfig{
Enabled: false,
Name: log.Name,
})
}
}
metrics := make([]*integrationtypes.AzureServiceMetricsConfig, 0)
if azureDefinition.Strategy != nil && azureDefinition.Strategy.Metrics != nil {
for _, metric := range azureDefinition.Strategy.Metrics {
metrics = append(metrics, &integrationtypes.AzureServiceMetricsConfig{
Enabled: false,
Name: metric.Name,
})
}
}
cfg.Logs = logs
cfg.Metrics = metrics
details.Config = cfg
}
isMetricsEnabled := details.Config != nil && details.Config.IsMetricsEnabled()
isLogsEnabled := details.Config != nil && details.Config.IsLogsEnabled()
connectionStatus, err := a.GetServiceConnectionStatus(
ctx,
*req.CloudAccountID,
req.OrgID,
azureDefinition,
isMetricsEnabled,
isLogsEnabled,
)
if err != nil {
return nil, err
}
details.ConnectionStatus = connectionStatus
return details, nil
}
func (a *azureProvider) GenerateConnectionArtifact(ctx context.Context, req *integrationtypes.PostableConnectionArtifact) (any, error) {
connection := new(integrationtypes.PostableAzureConnectionCommand)
err := integrationtypes.UnmarshalJSON(req.Data, connection)
if err != nil {
return nil, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed unmarshal request data into AWS connection config")
}
// validate connection config
if connection.AccountConfig != nil {
if !integrationtypes.ValidAzureRegions[connection.AccountConfig.DeploymentRegion] {
return nil, errors.NewInvalidInputf(CodeInvalidAzureRegion, "invalid azure region: %s",
connection.AccountConfig.DeploymentRegion,
)
}
}
config, err := integrationtypes.MarshalJSON(connection.AccountConfig)
if err != nil {
return nil, err
}
account, err := a.AccountsRepo.Upsert(
ctx, req.OrgID, a.GetName().String(), nil, config,
nil, nil, nil,
)
if err != nil {
return nil, err
}
agentVersion := "v0.0.8"
if connection.AgentConfig.Version != "" {
agentVersion = connection.AgentConfig.Version
}
// TODO: improve the command and set url
cliCommand := []string{"az", "stack", "sub", "create", "--name", "SigNozIntegration", "--location",
connection.AccountConfig.DeploymentRegion, "--template-uri", fmt.Sprintf("<url>%s", agentVersion),
"--action-on-unmanage", "deleteAll", "--deny-settings-mode", "denyDelete", "--parameters", fmt.Sprintf("rgName=%s", "signoz-integration-rg"),
fmt.Sprintf("rgLocation=%s", connection.AccountConfig.DeploymentRegion)}
return &integrationtypes.GettableAzureConnectionCommand{
AccountId: account.ID.String(),
AzureShellConnectionCommand: "az create",
AzureCliConnectionCommand: strings.Join(cliCommand, " "),
}, nil
}
func (a *azureProvider) UpdateAccountConfig(ctx context.Context, orgId valuer.UUID, accountId string, configBytes []byte) (any, error) {
config := new(integrationtypes.UpdatableAzureAccountConfig)
err := integrationtypes.UnmarshalJSON(configBytes, config)
if err != nil {
return nil, err
}
if len(config.Config.EnabledResourceGroups) < 1 {
return nil, errors.NewInvalidInputf(CodeInvalidAzureRegion, "azure region and resource groups must be provided")
}
//for azure, preserve deployment region if already set
account, err := a.AccountsRepo.Get(ctx, orgId.String(), a.GetName().String(), accountId)
if err != nil {
return nil, err
}
storedConfig := new(integrationtypes.AzureAccountConfig)
err = integrationtypes.UnmarshalJSON([]byte(account.Config), storedConfig)
if err != nil {
return nil, err
}
if account.Config != "" {
config.Config.DeploymentRegion = storedConfig.DeploymentRegion
}
configBytes, err = integrationtypes.MarshalJSON(config.Config)
if err != nil {
return nil, err
}
accountRecord, err := a.AccountsRepo.Upsert(
ctx, orgId.String(), a.GetName().String(), &accountId, configBytes, nil, nil, nil,
)
if err != nil {
return nil, err
}
return accountRecord.Account(a.GetName()), nil
}

View File

@@ -1 +1,94 @@
package cloudintegrations
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/types"
)
type ServiceSummary struct {
services.Metadata
Config *types.CloudServiceConfig `json:"config"`
}
type ServiceDetails struct {
services.Definition
Config *types.CloudServiceConfig `json:"config"`
ConnectionStatus *ServiceConnectionStatus `json:"status,omitempty"`
}
type AccountStatus struct {
Integration AccountIntegrationStatus `json:"integration"`
}
type AccountIntegrationStatus struct {
LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"`
}
type LogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type MetricsConfig struct {
Enabled bool `json:"enabled"`
}
type ServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
}
type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}
type CompiledCollectionStrategy = services.CollectionStrategy
func NewCompiledCollectionStrategy(provider string) (*CompiledCollectionStrategy, error) {
if provider == "aws" {
return &CompiledCollectionStrategy{
Provider: "aws",
AWSMetrics: &services.AWSMetricsStrategy{},
AWSLogs: &services.AWSLogsStrategy{},
}, nil
}
return nil, errors.NewNotFoundf(services.CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", provider)
}
// Helper for accumulating strategies for enabled services.
func AddServiceStrategy(serviceType string, cs *CompiledCollectionStrategy,
definitionStrat *services.CollectionStrategy, config *types.CloudServiceConfig) error {
if definitionStrat.Provider != cs.Provider {
return errors.NewInternalf(CodeMismatchCloudProvider, "can't add %s service strategy to compiled strategy for %s",
definitionStrat.Provider, cs.Provider)
}
if cs.Provider == "aws" {
if config.Logs != nil && config.Logs.Enabled {
if serviceType == services.S3Sync {
// S3 bucket sync; No cloudwatch logs are appended for this service type;
// Though definition is populated with a custom cloudwatch group that helps in calculating logs connection status
cs.S3Buckets = config.Logs.S3Buckets
} else if definitionStrat.AWSLogs != nil { // services that includes a logs subscription
cs.AWSLogs.Subscriptions = append(
cs.AWSLogs.Subscriptions,
definitionStrat.AWSLogs.Subscriptions...,
)
}
}
if config.Metrics != nil && config.Metrics.Enabled && definitionStrat.AWSMetrics != nil {
cs.AWSMetrics.StreamFilters = append(
cs.AWSMetrics.StreamFilters,
definitionStrat.AWSMetrics.StreamFilters...,
)
}
return nil
}
return errors.NewNotFoundf(services.CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cs.Provider)
}

View File

@@ -1,37 +0,0 @@
package cloudintegrations
import (
"log/slog"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/implawsprovider"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/implazureprovider"
integrationstore "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/store"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
)
func NewCloudProviderRegistry(
logger *slog.Logger,
store sqlstore.SQLStore,
querier querier.Querier,
) (map[integrationtypes.CloudProviderType]integrationtypes.CloudProvider, error) {
registry := make(map[integrationtypes.CloudProviderType]integrationtypes.CloudProvider)
accountsRepo := integrationstore.NewCloudProviderAccountsRepository(store)
serviceConfigRepo := integrationstore.NewServiceConfigRepository(store)
awsProviderImpl, err := implawsprovider.NewAWSCloudProvider(logger, accountsRepo, serviceConfigRepo, querier)
if err != nil {
return nil, err
}
registry[integrationtypes.CloudProviderAWS] = awsProviderImpl
azureProviderImpl, err := implazureprovider.NewAzureCloudProvider(logger, accountsRepo, serviceConfigRepo, querier)
if err != nil {
return nil, err
}
registry[integrationtypes.CloudProviderAzure] = azureProviderImpl
return registry, nil
}

View File

@@ -1,63 +1,64 @@
package store
package cloudintegrations
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
CodeServiceConfigNotFound = errors.MustNewCode("service_config_not_found")
)
type ServiceConfigDatabase interface {
Get(
get(
ctx context.Context,
orgID string,
cloudAccountId string,
serviceType string,
) ([]byte, error)
) (*types.CloudServiceConfig, *model.ApiError)
Upsert(
upsert(
ctx context.Context,
orgID string,
cloudProvider string,
cloudAccountId string,
serviceId string,
config []byte,
) ([]byte, error)
config types.CloudServiceConfig,
) (*types.CloudServiceConfig, *model.ApiError)
GetAllForAccount(
getAllForAccount(
ctx context.Context,
orgID string,
cloudAccountId string,
) (
map[string][]byte,
error,
configsBySvcId map[string]*types.CloudServiceConfig,
apiErr *model.ApiError,
)
}
func NewServiceConfigRepository(store sqlstore.SQLStore) ServiceConfigDatabase {
return &serviceConfigSQLRepository{store: store}
func newServiceConfigRepository(store sqlstore.SQLStore) (
*serviceConfigSQLRepository, error,
) {
return &serviceConfigSQLRepository{
store: store,
}, nil
}
type serviceConfigSQLRepository struct {
store sqlstore.SQLStore
}
func (r *serviceConfigSQLRepository) Get(
func (r *serviceConfigSQLRepository) get(
ctx context.Context,
orgID string,
cloudAccountId string,
serviceType string,
) ([]byte, error) {
var result integrationtypes.CloudIntegrationService
) (*types.CloudServiceConfig, *model.ApiError) {
var result types.CloudIntegrationService
err := r.store.BunDB().NewSelect().
Model(&result).
@@ -66,30 +67,36 @@ func (r *serviceConfigSQLRepository) Get(
Where("ci.id = ?", cloudAccountId).
Where("cis.type = ?", serviceType).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.WrapNotFoundf(err, CodeServiceConfigNotFound, "couldn't find config for cloud account %s", cloudAccountId)
}
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud service config")
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find config for cloud account %s",
cloudAccountId,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud service config: %w", err,
))
}
return []byte(result.Config), nil
return &result.Config, nil
}
func (r *serviceConfigSQLRepository) Upsert(
func (r *serviceConfigSQLRepository) upsert(
ctx context.Context,
orgID string,
cloudProvider string,
cloudAccountId string,
serviceId string,
config []byte,
) ([]byte, error) {
config types.CloudServiceConfig,
) (*types.CloudServiceConfig, *model.ApiError) {
// get cloud integration id from account id
// if the account is not connected, we don't need to upsert the config
var cloudIntegrationId string
err := r.store.BunDB().NewSelect().
Model((*integrationtypes.CloudIntegration)(nil)).
Model((*types.CloudIntegration)(nil)).
Column("id").
Where("provider = ?", cloudProvider).
Where("account_id = ?", cloudAccountId).
@@ -97,24 +104,20 @@ func (r *serviceConfigSQLRepository) Upsert(
Where("removed_at is NULL").
Where("last_agent_report is not NULL").
Scan(ctx, &cloudIntegrationId)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.WrapNotFoundf(
err,
CodeCloudIntegrationAccountNotFound,
"couldn't find active cloud integration account",
)
}
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud integration id")
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud integration id: %w", err,
))
}
serviceConfig := integrationtypes.CloudIntegrationService{
serviceConfig := types.CloudIntegrationService{
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
Config: string(config),
Config: config,
Type: serviceId,
CloudIntegrationID: cloudIntegrationId,
}
@@ -123,18 +126,21 @@ func (r *serviceConfigSQLRepository) Upsert(
On("conflict(cloud_integration_id, type) do update set config=excluded.config, updated_at=excluded.updated_at").
Exec(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't upsert cloud service config")
return nil, model.InternalError(fmt.Errorf(
"could not upsert cloud service config: %w", err,
))
}
return config, nil
return &serviceConfig.Config, nil
}
func (r *serviceConfigSQLRepository) GetAllForAccount(
func (r *serviceConfigSQLRepository) getAllForAccount(
ctx context.Context,
orgID string,
cloudAccountId string,
) (map[string][]byte, error) {
var serviceConfigs []integrationtypes.CloudIntegrationService
) (map[string]*types.CloudServiceConfig, *model.ApiError) {
serviceConfigs := []types.CloudIntegrationService{}
err := r.store.BunDB().NewSelect().
Model(&serviceConfigs).
@@ -143,13 +149,15 @@ func (r *serviceConfigSQLRepository) GetAllForAccount(
Where("ci.org_id = ?", orgID).
Scan(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query service configs from db")
return nil, model.InternalError(fmt.Errorf(
"could not query service configs from db: %w", err,
))
}
result := make(map[string][]byte)
result := map[string]*types.CloudServiceConfig{}
for _, r := range serviceConfigs {
result[r.Type] = []byte(r.Config)
result[r.Type] = &r.Config
}
return result, nil

View File

@@ -7,24 +7,6 @@
"metrics": true,
"logs": false
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_ApplicationELB_ConsumedLCUs_count",
"attributes": []
},
{
"key": "aws_ApplicationELB_ProcessedBytes_sum",
"attributes": []
}
]
}
]
},
"data_collected": {
"metrics": [
{

View File

@@ -7,75 +7,6 @@
"metrics": true,
"logs": true
},
"ingestion_status_check": {
"metrics": [
{
"category": "rest_api",
"display_name": "REST API Metrics",
"checks": [
{
"key": "aws_ApiGateway_Count_count",
"attributes": [
{
"name": "ApiName",
"operator": "EXISTS",
"value": ""
}
]
}
]
},
{
"category": "http_api",
"display_name": "HTTP API Metrics",
"checks": [
{
"key": "aws_ApiGateway_Count_count",
"attributes": [
{
"name": "ApiId",
"operator": "EXISTS",
"value": ""
}
]
}
]
},
{
"category": "websocket_api",
"display_name": "Websocket API Metrics",
"checks": [
{
"key": "aws_ApiGateway_Count_count",
"attributes": [
{
"name": "ApiId",
"operator": "EXISTS",
"value": ""
}
]
}
]
}
],
"logs": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"attributes": [
{
"name": "aws.cloudwatch.log_group_name",
"operator": "ILIKE",
"value": "API-Gateway%"
}
]
}
]
}
]
},
"data_collected": {
"metrics": [
{
@@ -217,146 +148,6 @@
"name": "aws_ApiGateway_Latency_sum",
"unit": "Milliseconds",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_4xx_sum",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_4xx_max",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_4xx_min",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_4xx_count",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_5xx_sum",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_5xx_max",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_5xx_min",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_5xx_count",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_DataProcessed_sum",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_DataProcessed_max",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_DataProcessed_min",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_DataProcessed_count",
"unit": "Bytes",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ExecutionError_sum",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ExecutionError_max",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ExecutionError_min",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ExecutionError_count",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ClientError_sum",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ClientError_max",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ClientError_min",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ClientError_count",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_IntegrationError_sum",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_IntegrationError_max",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_IntegrationError_min",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_IntegrationError_count",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ConnectCount_sum",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ConnectCount_max",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ConnectCount_min",
"unit": "Count",
"type": "Gauge"
},
{
"name": "aws_ApiGateway_ConnectCount_count",
"unit": "Count",
"type": "Gauge"
}
],
"logs": [

View File

@@ -7,24 +7,6 @@
"metrics": true,
"logs": false
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_DynamoDB_AccountMaxReads_max",
"attributes": []
},
{
"key": "aws_DynamoDB_AccountProvisionedReadCapacityUtilization_max",
"attributes": []
}
]
}
]
},
"data_collected": {
"metrics": [
{
@@ -409,4 +391,4 @@
}
]
}
}
}

View File

@@ -7,24 +7,6 @@
"metrics": true,
"logs": false
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_EC2_CPUUtilization_max",
"attributes": []
},
{
"key": "aws_EC2_NetworkIn_max",
"attributes": []
}
]
}
]
},
"data_collected": {
"metrics": [
{
@@ -533,4 +515,4 @@
}
]
}
}
}

View File

@@ -7,81 +7,6 @@
"metrics": true,
"logs": true
},
"ingestion_status_check": {
"metrics": [
{
"category": "overview",
"display_name": "Overview",
"checks": [
{
"key": "aws_ECS_CPUUtilization_max",
"attributes": []
},
{
"key": "aws_ECS_MemoryUtilization_max",
"attributes": []
}
]
},
{
"category": "containerinsights",
"display_name": "Container Insights",
"checks": [
{
"key": "aws_ECS_ContainerInsights_NetworkRxBytes_max",
"attributes": []
},
{
"key": "aws_ECS_ContainerInsights_StorageReadBytes_max",
"attributes": []
}
]
},
{
"category": "enhanced_containerinsights",
"display_name": "Enhanced Container Insights",
"checks": [
{
"key": "aws_ECS_ContainerInsights_ContainerCpuUtilization_max",
"attributes": [
{
"name": "TaskId",
"operator": "EXISTS",
"value": ""
}
]
},
{
"key": "aws_ECS_ContainerInsights_TaskMemoryUtilization_max",
"attributes": [
{
"name": "TaskId",
"operator": "EXISTS",
"value": ""
}
]
}
]
}
],
"logs": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"attributes": [
{
"name": "aws.cloudwatch.log_group_name",
"operator": "ILIKE",
"value": "%/ecs/%"
}
]
}
]
}
]
},
"data_collected": {
"metrics": [
{

View File

@@ -7,20 +7,6 @@
"metrics": true,
"logs": false
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_ElastiCache_CacheHitRate_max",
"attributes": []
}
]
}
]
},
"data_collected": {
"metrics":[
{
@@ -1942,7 +1928,7 @@
"unit": "Percent",
"type": "Gauge",
"description": ""
}
}
]
},
"telemetry_collection_strategy": {
@@ -1965,4 +1951,4 @@
}
]
}
}
}

View File

@@ -7,37 +7,6 @@
"metrics": true,
"logs": true
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_Lambda_Invocations_sum",
"attributes": []
}
]
}
],
"logs": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"attributes": [
{
"name": "aws.cloudwatch.log_group_name",
"operator": "ILIKE",
"value": "/aws/lambda%"
}
]
}
]
}
]
},
"data_collected": {
"metrics": [
{

View File

@@ -7,20 +7,6 @@
"metrics": true,
"logs": false
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_Kafka_KafkaDataLogsDiskUsed_max",
"attributes": []
}
]
}
]
},
"data_collected": {
"metrics": [
{
@@ -1102,3 +1088,4 @@
]
}
}

View File

@@ -7,37 +7,6 @@
"metrics": true,
"logs": true
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_RDS_CPUUtilization_max",
"attributes": []
}
]
}
],
"logs": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"attributes": [
{
"name": "resources.aws.cloudwatch.log_group_name",
"operator": "ILIKE",
"value": "/aws/rds%"
}
]
}
]
}
]
},
"data_collected": {
"metrics": [
{
@@ -831,4 +800,4 @@
}
]
}
}
}

View File

@@ -7,20 +7,6 @@
"metrics": true,
"logs": false
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_SNS_NumberOfMessagesPublished_sum",
"attributes": []
}
]
}
]
},
"data_collected": {
"metrics": [
{
@@ -141,4 +127,4 @@
}
]
}
}
}

View File

@@ -7,24 +7,6 @@
"metrics": true,
"logs": false
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "aws_SQS_SentMessageSize_max",
"attributes": []
},
{
"key": "aws_SQS_NumberOfMessagesSent_sum",
"attributes": []
}
]
}
]
},
"data_collected": {
"metrics": [
{
@@ -265,4 +247,4 @@
}
]
}
}
}

View File

@@ -1 +0,0 @@
<svg id="f2f04349-8aee-4413-84c9-a9053611b319" xmlns="http://www.w3.org/2000/svg" width="18" height="18" viewBox="0 0 18 18"><defs><linearGradient id="ad4c4f96-09aa-4f91-ba10-5cb8ad530f74" x1="9" y1="15.83" x2="9" y2="5.79" gradientUnits="userSpaceOnUse"><stop offset="0" stop-color="#b3b3b3" /><stop offset="0.26" stop-color="#c1c1c1" /><stop offset="1" stop-color="#e6e6e6" /></linearGradient></defs><title>Icon-storage-86</title><path d="M.5,5.79h17a0,0,0,0,1,0,0v9.48a.57.57,0,0,1-.57.57H1.07a.57.57,0,0,1-.57-.57V5.79A0,0,0,0,1,.5,5.79Z" fill="url(#ad4c4f96-09aa-4f91-ba10-5cb8ad530f74)" /><path d="M1.07,2.17H16.93a.57.57,0,0,1,.57.57V5.79a0,0,0,0,1,0,0H.5a0,0,0,0,1,0,0V2.73A.57.57,0,0,1,1.07,2.17Z" fill="#37c2b1" /><path d="M2.81,6.89H15.18a.27.27,0,0,1,.26.27v1.4a.27.27,0,0,1-.26.27H2.81a.27.27,0,0,1-.26-.27V7.16A.27.27,0,0,1,2.81,6.89Z" fill="#fff" /><path d="M2.82,9.68H15.19a.27.27,0,0,1,.26.27v1.41a.27.27,0,0,1-.26.27H2.82a.27.27,0,0,1-.26-.27V10A.27.27,0,0,1,2.82,9.68Z" fill="#37c2b1" /><path d="M2.82,12.5H15.19a.27.27,0,0,1,.26.27v1.41a.27.27,0,0,1-.26.27H2.82a.27.27,0,0,1-.26-.27V12.77A.27.27,0,0,1,2.82,12.5Z" fill="#258277" /></svg>

Before

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -1,293 +0,0 @@
{
"id": "blobstorage",
"title": "Blob Storage",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"supported_signals": {
"metrics": true,
"logs": true
},
"ingestion_status_check": {
"metrics": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"key": "placeholder",
"attributes": []
}
]
},
{
"category": "transactions",
"display_name": "Transactions",
"checks": [
{
"key": "placeholder",
"attributes": []
}
]
}
],
"logs": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"attributes": [
{
"name": "placeholder",
"operator": "ILIKE",
"value": "%/ecs/%"
}
]
}
]
}
]
},
"data_collected": {
"metrics": [
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
}
],
"logs": [
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
}
]
},
"telemetry_collection_strategy": {
"azure_metrics": [
{
"category_type": "metrics",
"name": "Capacity"
},
{
"category_type": "metrics",
"name": "Transaction"
}
],
"azure_logs": [
{
"category_type": "logs",
"name": "StorageRead"
},
{
"category_type": "logs",
"name": "StorageWrite"
},
{
"category_type": "logs",
"name": "StorageDelete"
}
]
},
"assets": {
"dashboards": [
{
"id": "overview",
"title": "Blob Storage Overview",
"description": "Overview of Blob Storage",
"definition": "file://assets/dashboards/overview.json"
}
]
}
}

View File

@@ -1,2 +0,0 @@
Monitor Azure Blob Storage with SigNoz
Collect key Blob Storage metrics and view them with an out of the box dashboard.

View File

@@ -1 +0,0 @@
<svg id="f2f04349-8aee-4413-84c9-a9053611b319" xmlns="http://www.w3.org/2000/svg" width="18" height="18" viewBox="0 0 18 18"><defs><linearGradient id="ad4c4f96-09aa-4f91-ba10-5cb8ad530f74" x1="9" y1="15.83" x2="9" y2="5.79" gradientUnits="userSpaceOnUse"><stop offset="0" stop-color="#b3b3b3" /><stop offset="0.26" stop-color="#c1c1c1" /><stop offset="1" stop-color="#e6e6e6" /></linearGradient></defs><title>Icon-storage-86</title><path d="M.5,5.79h17a0,0,0,0,1,0,0v9.48a.57.57,0,0,1-.57.57H1.07a.57.57,0,0,1-.57-.57V5.79A0,0,0,0,1,.5,5.79Z" fill="url(#ad4c4f96-09aa-4f91-ba10-5cb8ad530f74)" /><path d="M1.07,2.17H16.93a.57.57,0,0,1,.57.57V5.79a0,0,0,0,1,0,0H.5a0,0,0,0,1,0,0V2.73A.57.57,0,0,1,1.07,2.17Z" fill="#37c2b1" /><path d="M2.81,6.89H15.18a.27.27,0,0,1,.26.27v1.4a.27.27,0,0,1-.26.27H2.81a.27.27,0,0,1-.26-.27V7.16A.27.27,0,0,1,2.81,6.89Z" fill="#fff" /><path d="M2.82,9.68H15.19a.27.27,0,0,1,.26.27v1.41a.27.27,0,0,1-.26.27H2.82a.27.27,0,0,1-.26-.27V10A.27.27,0,0,1,2.82,9.68Z" fill="#37c2b1" /><path d="M2.82,12.5H15.19a.27.27,0,0,1,.26.27v1.41a.27.27,0,0,1-.26.27H2.82a.27.27,0,0,1-.26-.27V12.77A.27.27,0,0,1,2.82,12.5Z" fill="#258277" /></svg>

Before

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -1,289 +0,0 @@
{
"id": "frontdoor",
"title": "Front Door",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"supported_signals": {
"metrics": true,
"logs": true
},
"ingestion_status_check": {
"metrics": [
{
"category": "overview",
"display_name": "Overview",
"checks": [
{
"key": "placeholder",
"attributes": []
}
]
},
{
"category": "insights",
"display_name": "Blob Storage Insights",
"checks": [
{
"key": "placeholder",
"attributes": []
}
]
}
],
"logs": [
{
"category": "$default",
"display_name": "Default",
"checks": [
{
"attributes": [
{
"name": "placeholder",
"operator": "ILIKE",
"value": "%/ecs/%"
}
]
}
]
}
]
},
"data_collected": {
"metrics": [
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
},
{
"name": "placeholder_metric_1",
"unit": "Percent",
"type": "Gauge",
"description": ""
}
],
"logs": [
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
},
{
"name": "placeholder_log_1",
"path": "placeholder.path.value",
"type": "string"
}
]
},
"telemetry_collection_strategy": {
"azure_metrics": [
{
"category_type": "metrics",
"name": "Capacity"
},
{
"category_type": "metrics",
"name": "Transaction"
}
],
"azure_logs": [
{
"category_type": "logs",
"name": "StorageRead"
},
{
"category_type": "logs",
"name": "StorageWrite"
},
{
"category_type": "logs",
"name": "StorageDelete"
}
]
},
"assets": {
"dashboards": [
{
"id": "overview",
"title": "Front Door Overview",
"description": "Overview of Blob Storage",
"definition": "file://assets/dashboards/overview.json"
}
]
}
}

View File

@@ -1,2 +0,0 @@
Monitor Azure Front Door with SigNoz
Collect key Front Door metrics and view them with an out of the box dashboard.

View File

@@ -0,0 +1,91 @@
package services
import (
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
)
type Metadata struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
}
type Definition struct {
Metadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"data_collected"`
Strategy *CollectionStrategy `json:"telemetry_collection_strategy"`
}
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollected struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CollectionStrategy struct {
Provider string `json:"provider"`
AWSMetrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsStrategy `json:"aws_logs,omitempty"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type
}
type AWSMetricsStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
StreamFilters []struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
} `json:"cloudwatch_metric_stream_filters"`
}
type AWSLogsStrategy struct {
Subscriptions []struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
} `json:"cloudwatch_logs_subscriptions"`
}
type Dashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *dashboardtypes.StorableDashboardData `json:"definition,omitempty"`
}

View File

@@ -2,111 +2,128 @@ package services
import (
"bytes"
"context"
"embed"
"encoding/json"
"fmt"
"io/fs"
"path"
"sort"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/query-service/model"
koanfJson "github.com/knadh/koanf/parsers/json"
"golang.org/x/exp/maps"
)
const (
S3Sync = "s3sync"
)
var (
CodeServiceDefinitionNotFound = errors.MustNewCode("service_definition_not_dound")
CodeUnsupportedCloudProvider = errors.MustNewCode("unsupported_cloud_provider")
CodeUnsupportedServiceType = errors.MustNewCode("unsupported_service_type")
CodeUnsupportedCloudProvider = errors.MustNewCode("unsupported_cloud_provider")
CodeUnsupportedServiceType = errors.MustNewCode("unsupported_service_type")
)
type ServicesProvider[T integrationtypes.Definition] struct {
definitions map[string]T
func List(cloudProvider string) ([]Definition, *model.ApiError) {
cloudServices, found := supportedServices[cloudProvider]
if !found || cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider,
))
}
services := maps.Values(cloudServices)
sort.Slice(services, func(i, j int) bool {
return services[i].Id < services[j].Id
})
return services, nil
}
func (a *ServicesProvider[T]) ListServiceDefinitions(ctx context.Context) (map[string]T, error) {
return a.definitions, nil
}
func (a *ServicesProvider[T]) GetServiceDefinition(ctx context.Context, serviceName string) (T, error) {
def, ok := a.definitions[serviceName]
if !ok {
return *new(T), errors.NewNotFoundf(CodeServiceDefinitionNotFound, "azure service definition not found: %s", serviceName)
}
return def, nil
}
func NewAWSCloudProviderServices() (*ServicesProvider[*integrationtypes.AWSDefinition], error) {
definitions, err := readAllServiceDefinitions(integrationtypes.CloudProviderAWS)
if err != nil {
return nil, err
}
serviceDefinitions := make(map[string]*integrationtypes.AWSDefinition)
for id, def := range definitions {
typedDef, ok := def.(*integrationtypes.AWSDefinition)
if !ok {
return nil, errors.NewInternalf(errors.CodeInternal, "invalid type for AWS service definition %s", id)
}
serviceDefinitions[id] = typedDef
}
return &ServicesProvider[*integrationtypes.AWSDefinition]{
definitions: serviceDefinitions,
}, nil
}
func NewAzureCloudProviderServices() (*ServicesProvider[*integrationtypes.AzureDefinition], error) {
definitions, err := readAllServiceDefinitions(integrationtypes.CloudProviderAzure)
if err != nil {
return nil, err
}
serviceDefinitions := make(map[string]*integrationtypes.AzureDefinition)
for id, def := range definitions {
typedDef, ok := def.(*integrationtypes.AzureDefinition)
if !ok {
return nil, errors.NewInternalf(errors.CodeInternal, "invalid type for Azure service definition %s", id)
}
serviceDefinitions[id] = typedDef
}
return &ServicesProvider[*integrationtypes.AzureDefinition]{
definitions: serviceDefinitions,
}, nil
}
// End of API. Logic for reading service definition files follows
//go:embed definitions/*
var definitionFiles embed.FS
func readAllServiceDefinitions(cloudProvider valuer.String) (map[string]any, error) {
rootDirName := "definitions"
cloudProviderDirPath := path.Join(rootDirName, cloudProvider.String())
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
if err != nil {
return nil, err
}
if len(cloudServices) < 1 {
return nil, errors.NewInternalf(errors.CodeInternal, "no service definitions found in %s", cloudProviderDirPath)
func Map(cloudProvider string) (map[string]Definition, error) {
cloudServices, found := supportedServices[cloudProvider]
if !found || cloudServices == nil {
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cloudProvider)
}
return cloudServices, nil
}
func readServiceDefinitionsFromDir(cloudProvider valuer.String, cloudProviderDirPath string) (map[string]any, error) {
svcDefDirs, err := fs.ReadDir(definitionFiles, cloudProviderDirPath)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't list integrations dirs")
func GetServiceDefinition(cloudProvider, serviceType string) (*Definition, error) {
cloudServices := supportedServices[cloudProvider]
if cloudServices == nil {
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cloudProvider)
}
svcDefs := make(map[string]any)
svc, exists := cloudServices[serviceType]
if !exists {
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedServiceType, "%s service not found: %s", cloudProvider, serviceType)
}
return &svc, nil
}
// End of API. Logic for reading service definition files follows
// Service details read from ./serviceDefinitions
// { "providerName": { "service_id": {...}} }
var supportedServices map[string]map[string]Definition
func init() {
err := readAllServiceDefinitions()
if err != nil {
panic(fmt.Errorf(
"couldn't read cloud service definitions: %w", err,
))
}
}
//go:embed definitions/*
var definitionFiles embed.FS
func readAllServiceDefinitions() error {
supportedServices = map[string]map[string]Definition{}
rootDirName := "definitions"
cloudProviderDirs, err := fs.ReadDir(definitionFiles, rootDirName)
if err != nil {
return fmt.Errorf("couldn't read dirs in %s: %w", rootDirName, err)
}
for _, d := range cloudProviderDirs {
if !d.IsDir() {
continue
}
cloudProvider := d.Name()
cloudProviderDirPath := path.Join(rootDirName, cloudProvider)
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
if err != nil {
return fmt.Errorf("couldn't read %s service definitions: %w", cloudProvider, err)
}
if len(cloudServices) < 1 {
return fmt.Errorf("no %s services could be read", cloudProvider)
}
supportedServices[cloudProvider] = cloudServices
}
return nil
}
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]Definition, error,
) {
svcDefDirs, err := fs.ReadDir(definitionFiles, cloudProviderDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't list integrations dirs: %w", err)
}
svcDefs := map[string]Definition{}
for _, d := range svcDefDirs {
if !d.IsDir() {
@@ -116,73 +133,103 @@ func readServiceDefinitionsFromDir(cloudProvider valuer.String, cloudProviderDir
svcDirPath := path.Join(cloudProviderDirPath, d.Name())
s, err := readServiceDefinition(cloudProvider, svcDirPath)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
}
_, exists := svcDefs[s.GetId()]
_, exists := svcDefs[s.Id]
if exists {
return nil, errors.NewInternalf(errors.CodeInternal, "duplicate service definition for id %s at %s", s.GetId(), d.Name())
return nil, fmt.Errorf(
"duplicate service definition for id %s at %s", s.Id, d.Name(),
)
}
svcDefs[s.GetId()] = s
svcDefs[s.Id] = *s
}
return svcDefs, nil
}
func readServiceDefinition(cloudProvider valuer.String, svcDirpath string) (integrationtypes.Definition, error) {
func readServiceDefinition(cloudProvider string, svcDirpath string) (*Definition, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json")
serializedSpec, err := definitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't read integration definition in %s", svcDirpath)
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
svcDirpath, err,
)
}
integrationSpec, err := koanfJson.Parser().Unmarshal(serializedSpec)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't parse integration definition in %s", svcDirpath)
return nil, fmt.Errorf(
"couldn't parse integration.json from %s: %w",
integrationJsonPath, err,
)
}
hydrated, err := integrations.HydrateFileUris(integrationSpec, definitionFiles, svcDirpath)
hydrated, err := integrations.HydrateFileUris(
integrationSpec, definitionFiles, svcDirpath,
)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't hydrate integration definition in %s", svcDirpath)
return nil, fmt.Errorf(
"couldn't hydrate files referenced in service definition %s: %w",
integrationJsonPath, err,
)
}
hydratedSpec := hydrated.(map[string]any)
var serviceDef integrationtypes.Definition
switch cloudProvider {
case integrationtypes.CloudProviderAWS:
serviceDef = &integrationtypes.AWSDefinition{}
case integrationtypes.CloudProviderAzure:
serviceDef = &integrationtypes.AzureDefinition{}
default:
// ideally this shouldn't happen hence throwing internal error
return nil, errors.NewInternalf(errors.CodeInternal, "unsupported cloud provider: %s", cloudProvider)
serviceDef, err := ParseStructWithJsonTagsFromMap[Definition](hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
integrationJsonPath, err,
)
}
err = parseStructWithJsonTagsFromMap(hydratedSpec, serviceDef)
err = validateServiceDefinition(serviceDef)
if err != nil {
return nil, err
}
err = serviceDef.Validate()
if err != nil {
return nil, err
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}
serviceDef.Strategy.Provider = cloudProvider
return serviceDef, nil
}
func parseStructWithJsonTagsFromMap(data map[string]any, target interface{}) error {
mapJson, err := json.Marshal(data)
if err != nil {
return errors.WrapInternalf(err, errors.CodeInternal, "couldn't marshal service definition json data")
func validateServiceDefinition(s *Definition) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
if _, seen := seenDashboardIds[dd.Id]; seen {
return fmt.Errorf("multiple dashboards found with id %s", dd.Id)
}
seenDashboardIds[dd.Id] = nil
}
decoder := json.NewDecoder(bytes.NewReader(mapJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(target)
if err != nil {
return errors.WrapInternalf(err, errors.CodeInternal, "couldn't unmarshal service definition json data")
if s.Strategy == nil {
return fmt.Errorf("telemetry_collection_strategy is required")
}
// potentially more to follow
return nil
}
func ParseStructWithJsonTagsFromMap[StructType any](data map[string]any) (
*StructType, error,
) {
mapJson, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("couldn't marshal map to json: %w", err)
}
var res StructType
decoder := json.NewDecoder(bytes.NewReader(mapJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal json back to struct: %w", err)
}
return &res, nil
}

View File

@@ -1,3 +1,35 @@
package services
// TODO: add more tests for services package
import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/stretchr/testify/require"
)
func TestAvailableServices(t *testing.T) {
require := require.New(t)
// should be able to list available services.
_, apiErr := List("bad-cloud-provider")
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
awsSvcs, apiErr := List("aws")
require.Nil(apiErr)
require.Greater(len(awsSvcs), 0)
// should be able to get details of a service
_, err := GetServiceDefinition(
"aws", "bad-service-id",
)
require.NotNil(err)
require.True(errors.Ast(err, errors.TypeNotFound))
svc, err := GetServiceDefinition(
"aws", awsSvcs[0].Id,
)
require.Nil(err)
require.Equal(*svc, awsSvcs[0])
}

View File

@@ -6,7 +6,11 @@ import (
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/queryparser"
"io"
"math"
@@ -21,19 +25,14 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/errors"
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/prometheus/promql"
@@ -45,6 +44,7 @@ import (
"github.com/SigNoz/signoz/pkg/contextlinks"
traceFunnelsModule "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/inframetrics"
queues2 "github.com/SigNoz/signoz/pkg/query-service/app/integrations/messagingQueues/queues"
"github.com/SigNoz/signoz/pkg/query-service/app/logs"
@@ -111,7 +111,7 @@ type APIHandler struct {
IntegrationsController *integrations.Controller
cloudIntegrationsRegistry map[integrationtypes.CloudProviderType]integrationtypes.CloudProvider
CloudIntegrationsController *cloudintegrations.Controller
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
@@ -158,6 +158,9 @@ type APIHandlerOpts struct {
// Integrations
IntegrationsController *integrations.Controller
// Cloud Provider Integrations
CloudIntegrationsController *cloudintegrations.Controller
// Log parsing pipelines
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
@@ -171,8 +174,6 @@ type APIHandlerOpts struct {
QueryParserAPI *queryparser.API
Signoz *signoz.SigNoz
Logger *slog.Logger
}
// NewAPIHandler returns an APIHandler
@@ -208,21 +209,12 @@ func NewAPIHandler(opts APIHandlerOpts, config signoz.Config) (*APIHandler, erro
summaryService := metricsexplorer.NewSummaryService(opts.Reader, opts.RuleManager, opts.Signoz.Modules.Dashboard)
//quickFilterModule := quickfilter.NewAPI(opts.QuickFilterModule)
cloudIntegrationsRegistry, err := cloudintegrations.NewCloudProviderRegistry(
opts.Logger,
opts.Signoz.SQLStore,
opts.Signoz.Querier,
)
if err != nil {
return nil, err
}
aH := &APIHandler{
reader: opts.Reader,
temporalityMap: make(map[string]map[v3.Temporality]bool),
ruleManager: opts.RuleManager,
IntegrationsController: opts.IntegrationsController,
cloudIntegrationsRegistry: cloudIntegrationsRegistry,
CloudIntegrationsController: opts.CloudIntegrationsController,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
querierV2: querierv2,
@@ -1217,19 +1209,13 @@ func (aH *APIHandler) Get(rw http.ResponseWriter, r *http.Request) {
}
dashboard := new(dashboardtypes.Dashboard)
if integrationtypes.IsCloudIntegrationDashboardUuid(id) {
cloudProvider, err := integrationtypes.GetCloudProviderFromDashboardID(id)
if err != nil {
render.Error(rw, err)
if aH.CloudIntegrationsController.IsCloudIntegrationDashboardUuid(id) {
cloudIntegrationDashboard, apiErr := aH.CloudIntegrationsController.GetDashboardById(ctx, orgID, id)
if apiErr != nil {
render.Error(rw, errorsV2.Wrapf(apiErr, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to get dashboard"))
return
}
integrationDashboard, err := aH.cloudIntegrationsRegistry[cloudProvider].GetDashboard(ctx, id, orgID)
if err != nil {
render.Error(rw, err)
return
}
dashboard = integrationDashboard
dashboard = cloudIntegrationDashboard
} else if aH.IntegrationsController.IsInstalledIntegrationDashboardID(id) {
integrationDashboard, apiErr := aH.IntegrationsController.GetInstalledIntegrationDashboardById(ctx, orgID, id)
if apiErr != nil {
@@ -1293,13 +1279,11 @@ func (aH *APIHandler) List(rw http.ResponseWriter, r *http.Request) {
dashboards = append(dashboards, installedIntegrationDashboards...)
}
for _, provider := range aH.cloudIntegrationsRegistry {
cloudIntegrationDashboards, err := provider.GetAvailableDashboards(ctx, orgID)
if err != nil {
zap.L().Error("failed to get dashboards for cloud integrations", zap.Error(apiErr))
} else {
dashboards = append(dashboards, cloudIntegrationDashboards...)
}
cloudIntegrationDashboards, apiErr := aH.CloudIntegrationsController.AvailableDashboards(ctx, orgID)
if apiErr != nil {
zap.L().Error("failed to get dashboards for cloud integrations", zap.Error(apiErr))
} else {
dashboards = append(dashboards, cloudIntegrationDashboards...)
}
gettableDashboards, err := dashboardtypes.NewGettableDashboardsFromDashboards(dashboards)
@@ -3275,15 +3259,15 @@ func (aH *APIHandler) GetIntegrationConnectionStatus(w http.ResponseWriter, r *h
lookbackSeconds = 15 * 60
}
connectionStatus, err := aH.calculateConnectionStatus(
connectionStatus, apiErr := aH.calculateConnectionStatus(
r.Context(), orgID, connectionTests, lookbackSeconds,
)
if err != nil {
render.Error(w, err)
if apiErr != nil {
RespondError(w, apiErr, "Failed to calculate integration connection status")
return
}
render.Success(w, http.StatusOK, connectionStatus)
aH.Respond(w, connectionStatus)
}
func (aH *APIHandler) calculateConnectionStatus(
@@ -3291,11 +3275,10 @@ func (aH *APIHandler) calculateConnectionStatus(
orgID valuer.UUID,
connectionTests *integrations.IntegrationConnectionTests,
lookbackSeconds int64,
) (*integrations.IntegrationConnectionStatus, error) {
) (*integrations.IntegrationConnectionStatus, *model.ApiError) {
// Calculate connection status for signals in parallel
result := &integrations.IntegrationConnectionStatus{}
// TODO: migrate to errors package
errors := []*model.ApiError{}
var resultLock sync.Mutex
@@ -3493,14 +3476,12 @@ func (aH *APIHandler) UninstallIntegration(w http.ResponseWriter, r *http.Reques
aH.Respond(w, map[string]interface{}{})
}
// RegisterCloudIntegrationsRoutes register routes for cloud provider integrations
// cloud provider integrations
func (aH *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *middleware.AuthZ) {
subRouter := router.PathPrefix("/api/v1/cloud-integrations").Subrouter()
subRouter.Use(middleware.NewRecovery(aH.Signoz.Instrumentation.Logger()).Wrap)
subRouter.HandleFunc(
"/{cloudProvider}/accounts/generate-connection-url", am.EditAccess(aH.CloudIntegrationsGenerateConnectionArtifact),
"/{cloudProvider}/accounts/generate-connection-url", am.EditAccess(aH.CloudIntegrationsGenerateConnectionUrl),
).Methods(http.MethodPost)
subRouter.HandleFunc(
@@ -3534,303 +3515,483 @@ func (aH *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *mi
subRouter.HandleFunc(
"/{cloudProvider}/services/{serviceId}/config", am.EditAccess(aH.CloudIntegrationsUpdateServiceConfig),
).Methods(http.MethodPost)
}
func (aH *APIHandler) CloudIntegrationsGenerateConnectionArtifact(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
func (aH *APIHandler) CloudIntegrationsListConnectedAccounts(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
resp, apiErr := aH.CloudIntegrationsController.ListConnectedAccounts(
r.Context(), claims.OrgID, cloudProvider,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsGenerateConnectionUrl(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
req := cloudintegrations.GenerateConnectionUrlRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
result, apiErr := aH.CloudIntegrationsController.GenerateConnectionUrl(
r.Context(), claims.OrgID, cloudProvider, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsGetAccountStatus(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
resp, apiErr := aH.CloudIntegrationsController.GetAccountStatus(
r.Context(), claims.OrgID, cloudProvider, accountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsAgentCheckIn(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
req := cloudintegrations.AgentCheckInRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
result, err := aH.CloudIntegrationsController.CheckInAsAgent(
r.Context(), claims.OrgID, cloudProvider, req,
)
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsUpdateAccountConfig(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
req := cloudintegrations.UpdateAccountConfigRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
result, apiErr := aH.CloudIntegrationsController.UpdateAccountConfig(
r.Context(), claims.OrgID, cloudProvider, accountId, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsDisconnectAccount(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
result, apiErr := aH.CloudIntegrationsController.DisconnectAccount(
r.Context(), claims.OrgID, cloudProvider, accountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsListServices(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
var cloudAccountId *string
cloudAccountIdQP := r.URL.Query().Get("cloud_account_id")
if len(cloudAccountIdQP) > 0 {
cloudAccountId = &cloudAccountIdQP
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
resp, apiErr := aH.CloudIntegrationsController.ListServices(
r.Context(), claims.OrgID, cloudProvider, cloudAccountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsGetServiceDetails(
w http.ResponseWriter, r *http.Request,
) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
reqBody, err := io.ReadAll(r.Body)
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(w, errors.WrapInternalf(err, errors.CodeInternal, "failed to read request body"))
render.Error(w, err)
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].GenerateConnectionArtifact(r.Context(), &integrationtypes.PostableConnectionArtifact{
OrgID: claims.OrgID,
Data: reqBody,
})
cloudProvider := mux.Vars(r)["cloudProvider"]
serviceId := mux.Vars(r)["serviceId"]
var cloudAccountId *string
cloudAccountIdQP := r.URL.Query().Get("cloud_account_id")
if len(cloudAccountIdQP) > 0 {
cloudAccountId = &cloudAccountIdQP
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
resp, err := aH.CloudIntegrationsController.GetServiceDetails(
r.Context(), claims.OrgID, cloudProvider, serviceId, cloudAccountId,
)
if err != nil {
aH.Signoz.Instrumentation.Logger().ErrorContext(r.Context(),
"failed to generate connection artifact for cloud integration",
slog.String("cloudProvider", cloudProviderString),
slog.String("orgID", claims.OrgID),
render.Error(w, err)
return
}
// Add connection status for the 2 signals.
if cloudAccountId != nil {
connStatus, apiErr := aH.calculateCloudIntegrationServiceConnectionStatus(
r.Context(), orgID, cloudProvider, *cloudAccountId, resp,
)
render.Error(w, err)
return
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
resp.ConnectionStatus = connStatus
}
render.Success(w, http.StatusOK, resp)
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsListConnectedAccounts(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
ctx context.Context,
orgID valuer.UUID,
cloudProvider string,
cloudAccountId string,
svcDetails *cloudintegrations.ServiceDetails,
) (*cloudintegrations.ServiceConnectionStatus, *model.ApiError) {
if cloudProvider != "aws" {
// TODO(Raj): Make connection check generic for all providers in a follow up change
return nil, model.BadRequest(
fmt.Errorf("unsupported cloud provider: %s", cloudProvider),
)
}
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].ListConnectedAccounts(r.Context(), claims.OrgID)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, resp)
}
func (aH *APIHandler) CloudIntegrationsGetAccountStatus(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
accountId := mux.Vars(r)["accountId"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].GetAccountStatus(r.Context(), claims.OrgID, accountId)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, resp)
}
func (aH *APIHandler) CloudIntegrationsAgentCheckIn(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
req := new(integrationtypes.PostableAgentCheckInPayload)
if err = json.NewDecoder(r.Body).Decode(req); err != nil {
render.Error(w, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid request body"))
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
req.OrgID = claims.OrgID
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].AgentCheckIn(r.Context(), req)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, resp)
}
func (aH *APIHandler) CloudIntegrationsUpdateAccountConfig(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(w, err)
return
}
accountId := mux.Vars(r)["accountId"]
reqBody, err := io.ReadAll(r.Body)
if err != nil {
render.Error(w, errors.WrapInternalf(err, errors.CodeInternal, "failed to read request body"))
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].UpdateAccountConfig(r.Context(), orgID, accountId, reqBody)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, resp)
}
func (aH *APIHandler) CloudIntegrationsDisconnectAccount(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
accountId := mux.Vars(r)["accountId"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
result, err := aH.cloudIntegrationsRegistry[cloudProvider].DisconnectAccount(r.Context(), claims.OrgID, accountId)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, result)
}
func (aH *APIHandler) CloudIntegrationsListServices(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
var cloudAccountId *string
cloudAccountIdQP := r.URL.Query().Get("cloud_account_id")
if len(cloudAccountIdQP) > 0 {
cloudAccountId = &cloudAccountIdQP
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].ListServices(r.Context(), claims.OrgID, cloudAccountId)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, resp)
}
func (aH *APIHandler) CloudIntegrationsGetServiceDetails(w http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(w, err)
return
}
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
serviceId := mux.Vars(r)["serviceId"]
var cloudAccountId *string
cloudAccountIdQP := r.URL.Query().Get("cloud_account_id")
if len(cloudAccountIdQP) > 0 {
cloudAccountId = &cloudAccountIdQP
}
resp, err := aH.cloudIntegrationsRegistry[cloudProvider].GetServiceDetails(r.Context(), &integrationtypes.GetServiceDetailsReq{
OrgID: orgID,
ServiceId: serviceId,
CloudAccountID: cloudAccountId,
})
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, resp)
}
func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig(w http.ResponseWriter, r *http.Request) {
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(w, err)
return
}
serviceId := mux.Vars(r)["serviceId"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(w, err)
return
}
reqBody, err := io.ReadAll(r.Body)
if err != nil {
render.Error(w, errors.WrapInternalf(err,
errors.CodeInternal,
"failed to read update service config request body",
telemetryCollectionStrategy := svcDetails.Strategy
if telemetryCollectionStrategy == nil {
return nil, model.InternalError(fmt.Errorf(
"service doesn't have telemetry collection strategy: %s", svcDetails.Id,
))
}
result := &cloudintegrations.ServiceConnectionStatus{}
errors := []*model.ApiError{}
var resultLock sync.Mutex
var wg sync.WaitGroup
// Calculate metrics connection status
if telemetryCollectionStrategy.AWSMetrics != nil {
wg.Add(1)
go func() {
defer wg.Done()
metricsConnStatus, apiErr := aH.calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx, cloudAccountId, telemetryCollectionStrategy.AWSMetrics, svcDetails.DataCollected.Metrics,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Metrics = metricsConnStatus
}
}()
}
// Calculate logs connection status
if telemetryCollectionStrategy.AWSLogs != nil {
wg.Add(1)
go func() {
defer wg.Done()
logsConnStatus, apiErr := aH.calculateAWSIntegrationSvcLogsConnectionStatus(
ctx, orgID, cloudAccountId, telemetryCollectionStrategy.AWSLogs,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Logs = logsConnStatus
}
}()
}
wg.Wait()
if len(errors) > 0 {
return nil, errors[0]
}
return result, nil
}
func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *services.AWSMetricsStrategy,
metricsCollectedBySvc []services.CollectedMetric,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.StreamFilters) < 1 {
return nil, nil
}
expectedLabelValues := map[string]string{
"cloud_provider": "aws",
"cloud_account_id": cloudAccountId,
}
metricsNamespace := strategy.StreamFilters[0].Namespace
metricsNamespaceParts := strings.Split(metricsNamespace, "/")
if len(metricsNamespaceParts) >= 2 {
expectedLabelValues["service_namespace"] = metricsNamespaceParts[0]
expectedLabelValues["service_name"] = metricsNamespaceParts[1]
} else {
// metrics for single word namespaces like "CWAgent" do not
// have the service_namespace label populated
expectedLabelValues["service_name"] = metricsNamespaceParts[0]
}
metricNamesCollectedBySvc := []string{}
for _, cm := range metricsCollectedBySvc {
metricNamesCollectedBySvc = append(metricNamesCollectedBySvc, cm.Name)
}
statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric(
ctx, metricNamesCollectedBySvc, expectedLabelValues,
)
if apiErr != nil {
return nil, apiErr
}
if statusForLastReceivedMetric != nil {
return &cloudintegrations.SignalConnectionStatus{
LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis,
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
}
func (aH *APIHandler) calculateAWSIntegrationSvcLogsConnectionStatus(
ctx context.Context,
orgID valuer.UUID,
cloudAccountId string,
strategy *services.AWSLogsStrategy,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.Subscriptions) < 1 {
return nil, nil
}
logGroupNamePrefix := strategy.Subscriptions[0].LogGroupNamePrefix
if len(logGroupNamePrefix) < 1 {
return nil, nil
}
logsConnTestFilter := &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "cloud.account.id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "=",
Value: cloudAccountId,
},
{
Key: v3.AttributeKey{
Key: "aws.cloudwatch.log_group_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "like",
Value: logGroupNamePrefix + "%",
},
},
}
// TODO(Raj): Receive this as a param from UI in the future.
lookbackSeconds := int64(30 * 60)
qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: logsConnTestFilter,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, _, err := aH.querier.QueryRange(
ctx, orgID, qrParams,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]
return &cloudintegrations.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
}
func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
serviceId := mux.Vars(r)["serviceId"]
req := cloudintegrations.UpdateServiceConfigRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
result, err := aH.cloudIntegrationsRegistry[cloudProvider].UpdateServiceConfig(r.Context(), serviceId, orgID, reqBody)
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
return
}
result, err := aH.CloudIntegrationsController.UpdateServiceConfig(
r.Context(), claims.OrgID, cloudProvider, serviceId, &req,
)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, result)
aH.Respond(w, result)
}
// logs

View File

@@ -11,7 +11,6 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -108,7 +107,7 @@ type IntegrationsListItem struct {
type Integration struct {
IntegrationDetails
Installation *integrationtypes.InstalledIntegration `json:"installation"`
Installation *types.InstalledIntegration `json:"installation"`
}
type Manager struct {
@@ -224,7 +223,7 @@ func (m *Manager) InstallIntegration(
ctx context.Context,
orgId string,
integrationId string,
config integrationtypes.InstalledIntegrationConfig,
config types.InstalledIntegrationConfig,
) (*IntegrationsListItem, *model.ApiError) {
integrationDetails, apiErr := m.getIntegrationDetails(ctx, integrationId)
if apiErr != nil {
@@ -430,7 +429,7 @@ func (m *Manager) getInstalledIntegration(
ctx context.Context,
orgId string,
integrationId string,
) (*integrationtypes.InstalledIntegration, *model.ApiError) {
) (*types.InstalledIntegration, *model.ApiError) {
iis, apiErr := m.installedIntegrationsRepo.get(
ctx, orgId, []string{integrationId},
)
@@ -458,7 +457,7 @@ func (m *Manager) getInstalledIntegrations(
return nil, apiErr
}
installedTypes := utils.MapSlice(installations, func(i integrationtypes.InstalledIntegration) string {
installedTypes := utils.MapSlice(installations, func(i types.InstalledIntegration) string {
return i.Type
})
integrationDetails, apiErr := m.availableIntegrationsRepo.get(ctx, installedTypes)

View File

@@ -4,22 +4,22 @@ import (
"context"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/types"
)
type InstalledIntegrationsRepo interface {
list(ctx context.Context, orgId string) ([]integrationtypes.InstalledIntegration, *model.ApiError)
list(ctx context.Context, orgId string) ([]types.InstalledIntegration, *model.ApiError)
get(
ctx context.Context, orgId string, integrationTypes []string,
) (map[string]integrationtypes.InstalledIntegration, *model.ApiError)
) (map[string]types.InstalledIntegration, *model.ApiError)
upsert(
ctx context.Context,
orgId string,
integrationType string,
config integrationtypes.InstalledIntegrationConfig,
) (*integrationtypes.InstalledIntegration, *model.ApiError)
config types.InstalledIntegrationConfig,
) (*types.InstalledIntegration, *model.ApiError)
delete(ctx context.Context, orgId string, integrationType string) *model.ApiError
}

Some files were not shown because too many files have changed in this diff Show More