Compare commits

..

4 Commits

Author SHA1 Message Date
swapnil-signoz
871a4c74df refactor: normalising account_id if empty 2026-03-18 01:06:32 +05:30
swapnil-signoz
49bbaccbe2 Merge branch 'main' into fix/cloudintegration-index 2026-03-18 00:04:58 +05:30
swapnil-signoz
1502338f17 refactor: removing std errors pkg 2026-03-17 20:22:38 +05:30
swapnil-signoz
fdf75f03ac fix: adding migration for fixing wrong cloud integration unique index 2026-03-17 17:12:44 +05:30
27 changed files with 395 additions and 901 deletions

View File

@@ -190,7 +190,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.116.0
image: signoz/signoz:v0.115.0
ports:
- "8080:8080" # signoz port
# - "6060:6060" # pprof port

View File

@@ -117,7 +117,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.116.0
image: signoz/signoz:v0.115.0
ports:
- "8080:8080" # signoz port
volumes:

View File

@@ -181,7 +181,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.116.0}
image: signoz/signoz:${VERSION:-v0.115.0}
container_name: signoz
ports:
- "8080:8080" # signoz port

View File

@@ -109,7 +109,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.116.0}
image: signoz/signoz:${VERSION:-v0.115.0}
container_name: signoz
ports:
- "8080:8080" # signoz port

View File

@@ -42,7 +42,7 @@ func (middleware *AuthZ) ViewAccess(next http.HandlerFunc) http.HandlerFunc {
}
commentCtx := ctxtypes.CommentFromContext(ctx)
authtype, ok := commentCtx.Map()["identn_provider"]
authtype, ok := commentCtx.Map()["auth_type"]
if ok && (authtype == authtypes.IdentNProviderAPIkey.StringValue()) {
if err := claims.IsViewer(); err != nil {
middleware.logger.WarnContext(ctx, authzDeniedMessage, "claims", claims)
@@ -94,7 +94,7 @@ func (middleware *AuthZ) EditAccess(next http.HandlerFunc) http.HandlerFunc {
}
commentCtx := ctxtypes.CommentFromContext(ctx)
authtype, ok := commentCtx.Map()["identn_provider"]
authtype, ok := commentCtx.Map()["auth_type"]
if ok && (authtype == authtypes.IdentNProviderAPIkey.StringValue()) {
if err := claims.IsEditor(); err != nil {
middleware.logger.WarnContext(ctx, authzDeniedMessage, "claims", claims)
@@ -145,7 +145,7 @@ func (middleware *AuthZ) AdminAccess(next http.HandlerFunc) http.HandlerFunc {
}
commentCtx := ctxtypes.CommentFromContext(ctx)
authtype, ok := commentCtx.Map()["identn_provider"]
authtype, ok := commentCtx.Map()["auth_type"]
if ok && (authtype == authtypes.IdentNProviderAPIkey.StringValue()) {
if err := claims.IsAdmin(); err != nil {
middleware.logger.WarnContext(ctx, authzDeniedMessage, "claims", claims)

View File

@@ -101,8 +101,13 @@ func (provider *provider) GetIdentity(req *http.Request) (*authtypes.Identity, e
return nil, err
}
identity := authtypes.NewIdentity(user.ID, user.OrgID, user.Email, apiKey.Role, provider.Name())
return identity, nil
identity := authtypes.Identity{
UserID: user.ID,
Role: apiKey.Role,
Email: user.Email,
OrgID: user.OrgID,
}
return &identity, nil
}
func (provider *provider) Post(ctx context.Context, _ *http.Request, _ authtypes.Claims) {

View File

@@ -1,65 +0,0 @@
package cloudintegration
import (
"context"
"net/http"
citypes "github.com/SigNoz/signoz/pkg/types/cloudintegrationtypes"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
CreateAccount(ctx context.Context, account *citypes.Account) error
// GetAccount returns cloud integration account
GetAccount(ctx context.Context, orgID, accountID valuer.UUID) (*citypes.Account, error)
// ListAccounts lists accounts where agent is connected
ListAccounts(ctx context.Context, orgID valuer.UUID) ([]*citypes.Account, error)
// UpdateAccount updates the cloud integration account for a specific organization.
UpdateAccount(ctx context.Context, account *citypes.Account) error
// DisconnectAccount soft deletes/removes a cloud integration account.
DisconnectAccount(ctx context.Context, orgID, accountID valuer.UUID) error
// GetConnectionArtifact returns cloud provider specific connection information,
// client side handles how this information is shown
GetConnectionArtifact(ctx context.Context, account *citypes.Account, req *citypes.ConnectionArtifactRequest) (*citypes.ConnectionArtifact, error)
// ListServicesMetadata returns the list of services metadata for a cloud provider attached with the integrationID.
// This just returns a summary of the service and not the whole service definition
ListServicesMetadata(ctx context.Context, orgID valuer.UUID, integrationID *valuer.UUID) ([]*citypes.ServiceMetadata, error)
// GetService returns service definition details for a serviceID. This returns config and
// other details required to show in service details page on web client.
GetService(ctx context.Context, orgID valuer.UUID, integrationID *valuer.UUID, serviceID string) (*citypes.Service, error)
// UpdateService updates cloud integration service
UpdateService(ctx context.Context, orgID valuer.UUID, service *citypes.CloudIntegrationService) error
// AgentCheckIn is called by agent to heartbeat and get latest config in response.
AgentCheckIn(ctx context.Context, orgID valuer.UUID, req *citypes.AgentCheckInRequest) (*citypes.AgentCheckInResponse, error)
// GetDashboardByID returns dashboard JSON for a given dashboard id.
// this only returns the dashboard when the service (embedded in dashboard id) is enabled
// in the org for any cloud integration account
GetDashboardByID(ctx context.Context, orgID valuer.UUID, id string) (*dashboardtypes.Dashboard, error)
// ListDashboards returns list of dashboards across all connected cloud integration accounts
// for enabled services in the org. This list gets added to dashboard list page
ListDashboards(ctx context.Context, orgID valuer.UUID) ([]*dashboardtypes.Dashboard, error)
}
type Handler interface {
GetConnectionArtifact(http.ResponseWriter, *http.Request)
ListAccounts(http.ResponseWriter, *http.Request)
GetAccount(http.ResponseWriter, *http.Request)
UpdateAccount(http.ResponseWriter, *http.Request)
DisconnectAccount(http.ResponseWriter, *http.Request)
ListServicesMetadata(http.ResponseWriter, *http.Request)
GetService(http.ResponseWriter, *http.Request)
UpdateService(http.ResponseWriter, *http.Request)
AgentCheckIn(http.ResponseWriter, *http.Request)
}

View File

@@ -115,6 +115,7 @@ func (r *Repo) GetLatestVersion(
func (r *Repo) insertConfig(
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
) error {
if c.ElementType.StringValue() == "" {
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
}
@@ -228,25 +229,6 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
return nil
}
// GetDeployStatusByHash returns the DeployStatus for the given config hash
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
var version opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&version).
ColumnExpr("deploy_status").
Where("hash = ?", configHash).
Where("org_id = ?", orgId).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return opamptypes.DeployStatusUnknown, nil
}
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
}
return version.DeployStatus, nil
}
func (r *Repo) updateDeployStatusByHash(
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
) error {

View File

@@ -180,12 +180,6 @@ func (m *Manager) ReportConfigDeploymentStatus(
}
}
// Implements model.AgentConfigProvider
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
}
func GetLatestVersion(
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
) (*opamptypes.AgentConfigVersion, error) {

View File

@@ -1,8 +1,6 @@
package opamp
import (
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
)
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
// Interface for a source of otel collector config recommendations.
type AgentConfigProvider interface {

View File

@@ -5,7 +5,6 @@ import (
"log"
"net"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/knadh/koanf"
@@ -128,11 +127,6 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
return exists
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
return opamptypes.DeployStatusUnknown, nil
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
subscriberId := uuid.NewString()

View File

@@ -111,99 +111,90 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
return false
}
// agentDescriptionChanged returns true when the agent sends updated properties
// (e.g. capability flag, version) mid-connection, signalling the server to
// recompute and push a new RemoteConfig.
//
// On reconnect this always returns false: handleFirstStatus pre-copies
// AgentDescription into agent.Status so no diff is detected, avoiding a
// redundant config recompute.
func (agent *Agent) agentDescriptionChanged(newStatus *protobufs.AgentToServer) bool {
// nil AgentDescription means no change per OpAMP protocol.
if newStatus.AgentDescription == nil {
return false
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
prevStatus := agent.Status
if agent.Status == nil {
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
} else {
// Not a new Agent. Update the Status.
agent.Status.SequenceNum = newStatus.SequenceNum
// Check what's changed in the AgentDescription.
if newStatus.AgentDescription != nil {
// If the AgentDescription field is set it means the Agent tells us
// something is changed in the field since the last status report
// (or this is the first report).
// Make full comparison of previous and new descriptions to see if it
// really is different.
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
// Agent description didn't change.
agentDescrChanged = false
} else {
// Yes, the description is different, update it.
agent.Status.AgentDescription = newStatus.AgentDescription
agentDescrChanged = true
}
} else {
// AgentDescription field is not set, which means description didn't change.
agentDescrChanged = false
}
// Update remote config status if it is included and is different from what we have.
if newStatus.RemoteConfigStatus != nil &&
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
// todo: need to address multiple agent scenario here
// for now, the first response will be sent back to the UI
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
}
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
}
}
}
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
return false
if agentDescrChanged {
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
}
return agentDescrChanged
}
func (agent *Agent) updateHealth(newStatus *protobufs.AgentToServer) {
if newStatus.Health == nil {
return
}
agent.Status.Health = newStatus.Health
if agent.Status != nil && agent.Status.Health != nil && agent.Status.Health.Healthy {
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC()
}
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
return true
}
// updateRemoteConfigStatus updates the stored RemoteConfigStatus and notifies
// subscribers if the status has changed relative to what we have stored.
func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer) {
if newStatus.RemoteConfigStatus == nil ||
proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
return
}
// todo: need to address multiple agent scenario here
// for now, the first response will be sent back to the UI
hash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
switch newStatus.RemoteConfigStatus.Status {
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
onConfigFailure(agent.OrgID, agent.AgentID, hash, newStatus.RemoteConfigStatus.ErrorMessage)
// Update remote config status if it is included and is different from what we have.
if newStatus.RemoteConfigStatus != nil {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
}
}
// handleFirstStatus initializes agent.Status on the first message received from
// this agent instance. It is a no-op for all subsequent messages.
func (agent *Agent) handleFirstStatus(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) {
if agent.Status != nil {
return
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
if agent.Status == nil {
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
}
// Initialize with a clean slate.
agent.Status = &protobufs.AgentToServer{
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
},
}
if newStatus.RemoteConfigStatus == nil ||
newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
// Agent just started fresh — no prior deployment to reconcile with the DB.
return
}
// Since the server's connection is restarted;
// copy the agent description; so no change is detected by agentDescriptionChanged
agent.Status.AgentDescription = newStatus.AgentDescription
// Server reconnected while the agent was already running.
// Reconcile deployment status with DB; DB is the source of truth.
// If DB says in_progress but agent now reports APPLIED/FAILED,
// updateRemoteConfigStatus will detect the transition and notify subscribers.
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, agent.OrgID.String()+rawHash)
if err != nil {
return
}
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
// If the deployment is still in-flight, rehydrate the subscriber so that
// updateRemoteConfigStatus can fire onConfigSuccess/onConfigFailure when
// the agent next reports a terminal status.
if deployStatus != opamptypes.Deployed && deployStatus != opamptypes.DeployFailed {
ListenToConfigUpdate(agent.OrgID, agent.AgentID, rawHash, configProvider.ReportConfigDeploymentStatus)
}
}
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) bool {
agent.handleFirstStatus(newStatus, configProvider)
agentDescrChanged := agent.agentDescriptionChanged(newStatus)
// record healthy timestamp
if newStatus.Health != nil && newStatus.Health.Healthy {
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(newStatus.Health.StartTimeUnixNano)).UTC()
}
// notify subscribers first; this will update the status in the DB
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
agent.updateRemoteConfigStatus(newStatus)
// update local reference in last.
agent.Status = newStatus
agent.updateHealth(newStatus)
return agentDescrChanged
}
@@ -246,7 +237,7 @@ func (agent *Agent) processStatusUpdate(
// current status is not up-to-date.
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
agentDescrChanged := agent.updateStatusField(newStatus)
// Check if any fields were omitted in the status report.
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&

View File

@@ -1,11 +1,6 @@
package model
import (
"context"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
import "github.com/SigNoz/signoz/pkg/valuer"
// Interface for source of otel collector config recommendations.
type AgentConfigProvider interface {
@@ -25,10 +20,4 @@ type AgentConfigProvider interface {
configId string,
err error,
)
// GetDeployStatusByHash returns the DeployStatus for the given config hash
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
// no matching row exists. Used by the agent's first-connect handler to
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
}

View File

@@ -66,7 +66,6 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
defer coordinator.mutex.Unlock()
key := getSubscriberKey(orgId, hash)
if subs, ok := coordinator.subscribers[key]; ok {
subs = append(subs, ss)
coordinator.subscribers[key] = subs

View File

@@ -7,14 +7,12 @@ import (
"sync"
"time"
"log/slog"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
opentracing "github.com/opentracing/opentracing-go"
plabels "github.com/prometheus/prometheus/model/labels"
"log/slog"
)
// PromRuleTask is a promql rule executor
@@ -373,7 +371,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
comment := ctxtypes.CommentFromContext(ctx)
comment.Set("rule_id", rule.ID())
comment.Set("identn_provider", authtypes.IdentNProviderInternal.StringValue())
comment.Set("auth_type", "internal")
ctx = ctxtypes.NewContextWithComment(ctx, comment)
_, err := rule.Eval(ctx, ts)

View File

@@ -10,7 +10,6 @@ import (
"log/slog"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -359,7 +358,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
comment := ctxtypes.CommentFromContext(ctx)
comment.Set("rule_id", rule.ID())
comment.Set("identn_provider", authtypes.IdentNProviderInternal.StringValue())
comment.Set("auth_type", "internal")
ctx = ctxtypes.NewContextWithComment(ctx, comment)
_, err := rule.Eval(ctx, ts)

View File

@@ -175,6 +175,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewMigrateRulesV4ToV5Factory(sqlstore, telemetryStore),
sqlmigration.NewAddStatusUserFactory(sqlstore, sqlschema),
sqlmigration.NewDeprecateUserInviteFactory(sqlstore, sqlschema),
sqlmigration.NewFixCloudIntegrationUniqueIndexFactory(sqlstore, sqlschema),
)
}

View File

@@ -0,0 +1,264 @@
package sqlmigration
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type fixCloudIntegrationUniqueIndex struct {
sqlstore sqlstore.SQLStore
sqlschema sqlschema.SQLSchema
}
func NewFixCloudIntegrationUniqueIndexFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(
factory.MustNewName("fix_cloud_integration_index"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &fixCloudIntegrationUniqueIndex{
sqlstore: sqlstore,
sqlschema: sqlschema,
}, nil
},
)
}
func (migration *fixCloudIntegrationUniqueIndex) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
type cloudIntegrationRow struct {
bun.BaseModel `bun:"table:cloud_integration"`
ID string `bun:"id"`
AccountID string `bun:"account_id"`
Provider string `bun:"provider"`
OrgID string `bun:"org_id"`
Config string `bun:"config"`
UpdatedAt time.Time `bun:"updated_at"`
}
type cloudIntegrationAccountConfig struct {
Regions []string `json:"regions"`
}
// duplicateGroup holds the keeper (first element) and losers (rest) for a duplicate (account_id, provider, org_id) group.
type duplicateGroup struct {
keeper *cloudIntegrationRow
losers []*cloudIntegrationRow
}
func (migration *fixCloudIntegrationUniqueIndex) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
// Step 1: Drop the wrong index on (id, provider, org_id)
dropSqls := migration.sqlschema.Operator().DropIndex(
(&sqlschema.UniqueIndex{
TableName: "cloud_integration",
ColumnNames: []sqlschema.ColumnName{"id", "provider", "org_id"},
}).Named("unique_cloud_integration"),
)
for _, sql := range dropSqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
// Step 2: Normalize empty-string account_id to NULL
// Older table structure could store "" instead of NULL for unconnected accounts.
// Empty strings would violate the partial unique index since '' = '' (unlike NULL != NULL).
_, err = tx.NewUpdate().
TableExpr("cloud_integration").
Set("account_id = NULL").
Where("account_id = ''").
Exec(ctx)
if err != nil {
return err
}
// Step 3: Fetch all active rows with non-null account_id, ordered for grouping
var activeRows []*cloudIntegrationRow
err = tx.NewSelect().
Model(&activeRows).
Where("removed_at IS NULL").
Where("account_id IS NOT NULL").
OrderExpr("account_id, provider, org_id, updated_at DESC").
Scan(ctx)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
}
// Group by (account_id, provider, org_id)
groups := groupCloudIntegrationRows(activeRows)
now := time.Now()
var loserIDs []string
for _, group := range groups {
if len(group.losers) == 0 {
continue
}
// Step 4: Merge config from losers into keeper
if err = mergeCloudIntegrationConfigs(ctx, tx, group); err != nil {
return err
}
// Step 5: Reassign orphaned cloud_integration_service rows
for _, loser := range group.losers {
// Delete services from loser that would conflict with keeper's services (same type)
_, err = tx.NewDelete().
TableExpr("cloud_integration_service").
Where("cloud_integration_id = ?", loser.ID).
Where("type IN (?)",
tx.NewSelect().
TableExpr("cloud_integration_service").
Column("type").
Where("cloud_integration_id = ?", group.keeper.ID),
).
Exec(ctx)
if err != nil {
return err
}
// Reassign remaining services to the keeper
_, err = tx.ExecContext(ctx,
"UPDATE cloud_integration_service SET cloud_integration_id = ? WHERE cloud_integration_id = ?",
group.keeper.ID, loser.ID,
)
if err != nil {
return err
}
loserIDs = append(loserIDs, loser.ID)
}
}
// Step 6: Soft-delete all loser rows
if len(loserIDs) > 0 {
_, err = tx.NewUpdate().
TableExpr("cloud_integration").
Set("removed_at = ?", now).
Set("updated_at = ?", now).
Where("id IN (?)", bun.In(loserIDs)).
Exec(ctx)
if err != nil {
return err
}
}
// Step 7: Create the correct partial unique index on (account_id, provider, org_id) WHERE removed_at IS NULL
createSqls := migration.sqlschema.Operator().CreateIndex(
&sqlschema.PartialUniqueIndex{
TableName: "cloud_integration",
ColumnNames: []sqlschema.ColumnName{"account_id", "provider", "org_id"},
Where: "removed_at IS NULL",
},
)
for _, sql := range createSqls {
if _, err = tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
return tx.Commit()
}
func (migration *fixCloudIntegrationUniqueIndex) Down(ctx context.Context, db *bun.DB) error {
return nil
}
// groupCloudIntegrationRows groups rows by (account_id, provider, org_id).
// Rows must be pre-sorted by account_id, provider, org_id, updated_at DESC
// so the first row in each group is the keeper (most recently updated).
func groupCloudIntegrationRows(rows []*cloudIntegrationRow) []duplicateGroup {
if len(rows) == 0 {
return nil
}
var groups []duplicateGroup
var current duplicateGroup
current.keeper = rows[0]
for i := 1; i < len(rows); i++ {
row := rows[i]
if row.AccountID == current.keeper.AccountID &&
row.Provider == current.keeper.Provider &&
row.OrgID == current.keeper.OrgID {
current.losers = append(current.losers, row)
} else {
groups = append(groups, current)
current = duplicateGroup{keeper: row}
}
}
groups = append(groups, current)
return groups
}
// mergeCloudIntegrationConfigs unions the EnabledRegions from all rows in the group into the keeper's config and updates
func mergeCloudIntegrationConfigs(ctx context.Context, tx bun.Tx, group duplicateGroup) error {
regionSet := make(map[string]struct{})
// Parse keeper's config
parseRegions(group.keeper.Config, regionSet)
// Parse each loser's config
for _, loser := range group.losers {
parseRegions(loser.Config, regionSet)
}
// Build merged config
mergedRegions := make([]string, 0, len(regionSet))
for region := range regionSet {
mergedRegions = append(mergedRegions, region)
}
merged := cloudIntegrationAccountConfig{Regions: mergedRegions}
mergedJSON, err := json.Marshal(merged)
if err != nil {
return err
}
// Update keeper's config
_, err = tx.NewUpdate().
TableExpr("cloud_integration").
Set("config = ?", string(mergedJSON)).
Where("id = ?", group.keeper.ID).
Exec(ctx)
return err
}
// parseRegions unmarshals a config JSON string and adds its regions to the set.
func parseRegions(configJSON string, regionSet map[string]struct{}) {
if configJSON == "" {
return
}
var config cloudIntegrationAccountConfig
if err := json.Unmarshal([]byte(configJSON), &config); err != nil {
return
}
for _, region := range config.Regions {
regionSet[region] = struct{}{}
}
}

View File

@@ -6,7 +6,6 @@ var (
IdentNProviderTokenizer = IdentNProvider{valuer.NewString("tokenizer")}
IdentNProviderAPIkey = IdentNProvider{valuer.NewString("api_key")}
IdentNProviderAnonymous = IdentNProvider{valuer.NewString("anonymous")}
IdentNProviderInternal = IdentNProvider{valuer.NewString("internal")}
)
type IdentNProvider struct{ valuer.String }

View File

@@ -1,43 +0,0 @@
package cloudintegrationtypes
import (
"time"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Account struct {
types.Identifiable
types.TimeAuditable
ProviderAccountId *string `json:"providerAccountID,omitempty"`
Provider CloudProviderType `json:"provider"`
RemovedAt *time.Time `json:"removedAt,omitempty"`
AgentReport *AgentReport `json:"agentReport,omitempty"`
OrgID valuer.UUID `json:"orgID"`
Config *AccountConfig `json:"config,omitempty"`
}
// AgentReport represents heartbeats sent by the agent.
type AgentReport struct {
TimestampMillis int64 `json:"timestampMillis"`
Data map[string]any `json:"data"`
}
type GettableAccounts struct {
Accounts []*Account `json:"accounts"`
}
type GettableAccount = Account
type UpdatableAccount struct {
Config *AccountConfig `json:"config"`
}
type AccountConfig struct {
AWS *AWSAccountConfig `json:"aws,omitempty"`
}
type AWSAccountConfig struct {
Regions []string `json:"regions"`
}

View File

@@ -1,80 +0,0 @@
package cloudintegrationtypes
import (
"database/sql/driver"
"encoding/json"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/uptrace/bun"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
ErrCodeCloudIntegrationNotFound = errors.MustNewCode("cloud_integration_not_found")
)
// StorableCloudIntegration represents a cloud integration stored in the database.
// This is also referred as "Account" in the context of cloud integrations.
type StorableCloudIntegration struct {
bun.BaseModel `bun:"table:cloud_integration"`
types.Identifiable
types.TimeAuditable
Provider CloudProviderType `bun:"provider,type:text"`
Config string `bun:"config,type:text"` // Config is provider-specific data in JSON string format
AccountID *string `bun:"account_id,type:text"`
LastAgentReport *StorableAgentReport `bun:"last_agent_report,type:text"`
RemovedAt *time.Time `bun:"removed_at,type:timestamp,nullzero"`
OrgID valuer.UUID `bun:"org_id,type:text"`
}
// StorableAgentReport represents the last heartbeat and arbitrary data sent by the agent
// as of now there is no use case for Data field, but keeping it for backwards compatibility with older structure.
type StorableAgentReport struct {
TimestampMillis int64 `json:"timestamp_millis"` // backward compatibility
Data map[string]any `json:"data"`
}
// StorableCloudIntegrationService is to store service config for a cloud integration, which is a cloud provider specific configuration.
type StorableCloudIntegrationService struct {
bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"`
types.Identifiable
types.TimeAuditable
Type ServiceID `bun:"type,type:text,notnull"` // Keeping Type field name as is, but it is a service id
Config string `bun:"config,type:text"` // Config is cloud provider's service specific data in JSON string format
CloudIntegrationID valuer.UUID `bun:"cloud_integration_id,type:text"`
}
// Scan scans value from DB.
func (r *StorableAgentReport) Scan(src any) error {
var data []byte
switch v := src.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
}
return json.Unmarshal(data, r)
}
// Value creates value to be stored in DB.
func (r *StorableAgentReport) Value() (driver.Value, error) {
if r == nil {
return nil, errors.NewInternalf(errors.CodeInternal, "agent report is nil")
}
serialized, err := json.Marshal(r)
if err != nil {
return nil, errors.WrapInternalf(
err, errors.CodeInternal, "couldn't serialize agent report to JSON",
)
}
// Return as string instead of []byte to ensure PostgreSQL stores as text, not bytes
return string(serialized), nil
}

View File

@@ -1,41 +0,0 @@
package cloudintegrationtypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
// CloudProviderType type alias.
type CloudProviderType struct{ valuer.String }
var (
// cloud providers.
CloudProviderTypeAWS = CloudProviderType{valuer.NewString("aws")}
CloudProviderTypeAzure = CloudProviderType{valuer.NewString("azure")}
// errors.
ErrCodeCloudProviderInvalidInput = errors.MustNewCode("invalid_cloud_provider")
AWSIntegrationUserEmail = valuer.MustNewEmail("aws-integration@signoz.io")
AzureIntegrationUserEmail = valuer.MustNewEmail("azure-integration@signoz.io")
)
// CloudIntegrationUserEmails is the list of valid emails for Cloud One Click integrations.
// This is used for validation and restrictions in different contexts, across codebase.
var CloudIntegrationUserEmails = []valuer.Email{
AWSIntegrationUserEmail,
AzureIntegrationUserEmail,
}
// NewCloudProvider returns a new CloudProviderType from a string.
// It validates the input and returns an error if the input is not valid cloud provider.
func NewCloudProvider(provider string) (CloudProviderType, error) {
switch provider {
case CloudProviderTypeAWS.StringValue():
return CloudProviderTypeAWS, nil
case CloudProviderTypeAzure.StringValue():
return CloudProviderTypeAzure, nil
default:
return CloudProviderType{}, errors.NewInvalidInputf(ErrCodeCloudProviderInvalidInput, "invalid cloud provider: %s", provider)
}
}

View File

@@ -1,88 +0,0 @@
package cloudintegrationtypes
import "github.com/SigNoz/signoz/pkg/types/integrationtypes"
type ConnectionArtifactRequest struct {
Aws *AWSConnectionArtifactRequest `json:"aws"`
}
type AWSConnectionArtifactRequest struct {
DeploymentRegion string `json:"deploymentRegion"`
Regions []string `json:"regions"`
}
type PostableConnectionArtifact = ConnectionArtifactRequest
type ConnectionArtifact struct {
Aws *AWSConnectionArtifact `json:"aws"`
}
type AWSConnectionArtifact struct {
ConnectionUrl string `json:"connectionURL"`
}
type GettableConnectionArtifact = ConnectionArtifact
type AccountStatus struct {
Id string `json:"id"`
ProviderAccountId *string `json:"providerAccountID,omitempty"`
Status integrationtypes.AccountStatus `json:"status"`
}
type GettableAccountStatus = AccountStatus
type AgentCheckInRequest struct {
// older backward compatible fields are mapped to new fields
// CloudIntegrationId string `json:"cloudIntegrationId"`
// AccountId string `json:"accountId"`
// New fields
ProviderAccountId string `json:"providerAccountId"`
CloudAccountId string `json:"cloudAccountId"`
Data map[string]any `json:"data,omitempty"`
}
type PostableAgentCheckInRequest struct {
AgentCheckInRequest
// following are backward compatible fields for older running agents
// which gets mapped to new fields in AgentCheckInRequest
CloudIntegrationId string `json:"cloud_integration_id"`
CloudAccountId string `json:"cloud_account_id"`
}
type GettableAgentCheckInResponse struct {
AgentCheckInResponse
// For backward compatibility
CloudIntegrationId string `json:"cloud_integration_id"`
AccountId string `json:"account_id"`
}
type AgentCheckInResponse struct {
// Older fields for backward compatibility are mapped to new fields below
// CloudIntegrationId string `json:"cloud_integration_id"`
// AccountId string `json:"account_id"`
// New fields
ProviderAccountId string `json:"providerAccountId"`
CloudAccountId string `json:"cloudAccountId"`
// IntegrationConfig populates data related to integration that is required for an agent
// to start collecting telemetry data
// keeping JSON key snake_case for backward compatibility
IntegrationConfig *IntegrationConfig `json:"integration_config,omitempty"`
}
type IntegrationConfig struct {
EnabledRegions []string `json:"enabledRegions"` // backward compatible
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"` // backward compatible
// new fields
AWS *AWSIntegrationConfig `json:"aws,omitempty"`
}
type AWSIntegrationConfig struct {
EnabledRegions []string `json:"enabledRegions"`
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"`
}

View File

@@ -1,103 +0,0 @@
package cloudintegrationtypes
import (
"github.com/SigNoz/signoz/pkg/errors"
)
var (
ErrCodeInvalidCloudRegion = errors.MustNewCode("invalid_cloud_region")
ErrCodeMismatchCloudProvider = errors.MustNewCode("cloud_provider_mismatch")
)
// List of all valid cloud regions on Amazon Web Services.
var ValidAWSRegions = map[string]struct{}{
"af-south-1": {}, // Africa (Cape Town).
"ap-east-1": {}, // Asia Pacific (Hong Kong).
"ap-northeast-1": {}, // Asia Pacific (Tokyo).
"ap-northeast-2": {}, // Asia Pacific (Seoul).
"ap-northeast-3": {}, // Asia Pacific (Osaka).
"ap-south-1": {}, // Asia Pacific (Mumbai).
"ap-south-2": {}, // Asia Pacific (Hyderabad).
"ap-southeast-1": {}, // Asia Pacific (Singapore).
"ap-southeast-2": {}, // Asia Pacific (Sydney).
"ap-southeast-3": {}, // Asia Pacific (Jakarta).
"ap-southeast-4": {}, // Asia Pacific (Melbourne).
"ca-central-1": {}, // Canada (Central).
"ca-west-1": {}, // Canada West (Calgary).
"eu-central-1": {}, // Europe (Frankfurt).
"eu-central-2": {}, // Europe (Zurich).
"eu-north-1": {}, // Europe (Stockholm).
"eu-south-1": {}, // Europe (Milan).
"eu-south-2": {}, // Europe (Spain).
"eu-west-1": {}, // Europe (Ireland).
"eu-west-2": {}, // Europe (London).
"eu-west-3": {}, // Europe (Paris).
"il-central-1": {}, // Israel (Tel Aviv).
"me-central-1": {}, // Middle East (UAE).
"me-south-1": {}, // Middle East (Bahrain).
"sa-east-1": {}, // South America (Sao Paulo).
"us-east-1": {}, // US East (N. Virginia).
"us-east-2": {}, // US East (Ohio).
"us-west-1": {}, // US West (N. California).
"us-west-2": {}, // US West (Oregon).
}
// List of all valid cloud regions for Microsoft Azure.
var ValidAzureRegions = map[string]struct{}{
"australiacentral": {}, // Australia Central
"australiacentral2": {}, // Australia Central 2
"australiaeast": {}, // Australia East
"australiasoutheast": {}, // Australia Southeast
"austriaeast": {}, // Austria East
"belgiumcentral": {}, // Belgium Central
"brazilsouth": {}, // Brazil South
"brazilsoutheast": {}, // Brazil Southeast
"canadacentral": {}, // Canada Central
"canadaeast": {}, // Canada East
"centralindia": {}, // Central India
"centralus": {}, // Central US
"chilecentral": {}, // Chile Central
"denmarkeast": {}, // Denmark East
"eastasia": {}, // East Asia
"eastus": {}, // East US
"eastus2": {}, // East US 2
"francecentral": {}, // France Central
"francesouth": {}, // France South
"germanynorth": {}, // Germany North
"germanywestcentral": {}, // Germany West Central
"indonesiacentral": {}, // Indonesia Central
"israelcentral": {}, // Israel Central
"italynorth": {}, // Italy North
"japaneast": {}, // Japan East
"japanwest": {}, // Japan West
"koreacentral": {}, // Korea Central
"koreasouth": {}, // Korea South
"malaysiawest": {}, // Malaysia West
"mexicocentral": {}, // Mexico Central
"newzealandnorth": {}, // New Zealand North
"northcentralus": {}, // North Central US
"northeurope": {}, // North Europe
"norwayeast": {}, // Norway East
"norwaywest": {}, // Norway West
"polandcentral": {}, // Poland Central
"qatarcentral": {}, // Qatar Central
"southafricanorth": {}, // South Africa North
"southafricawest": {}, // South Africa West
"southcentralus": {}, // South Central US
"southindia": {}, // South India
"southeastasia": {}, // Southeast Asia
"spaincentral": {}, // Spain Central
"swedencentral": {}, // Sweden Central
"switzerlandnorth": {}, // Switzerland North
"switzerlandwest": {}, // Switzerland West
"uaecentral": {}, // UAE Central
"uaenorth": {}, // UAE North
"uksouth": {}, // UK South
"ukwest": {}, // UK West
"westcentralus": {}, // West Central US
"westeurope": {}, // West Europe
"westindia": {}, // West India
"westus": {}, // West US
"westus2": {}, // West US 2
"westus3": {}, // West US 3
}

View File

@@ -1,248 +0,0 @@
package cloudintegrationtypes
import (
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
S3Sync = valuer.NewString("s3sync")
// ErrCodeInvalidServiceID is the error code for invalid service id.
ErrCodeInvalidServiceID = errors.MustNewCode("invalid_service_id")
)
type ServiceID struct{ valuer.String }
type CloudIntegrationService struct {
types.Identifiable
types.TimeAuditable
Type ServiceID `json:"type"`
Config *ServiceConfig `json:"config"`
CloudIntegrationID valuer.UUID `json:"cloudIntegrationID"`
}
// ServiceMetadata helps to quickly list available services and whether it is enabled or not.
// As getting complete service definition is a heavy operation and the response is also large,
// initial integration page load can be very slow.
type ServiceMetadata struct {
ServiceDefinitionMetadata
// if the service is enabled for the account
Enabled bool `json:"enabled"`
}
type GettableServicesMetadata struct {
Services []*ServiceMetadata `json:"services"`
}
type Service struct {
ServiceDefinition
ServiceConfig *ServiceConfig `json:"serviceConfig"`
}
type GettableService = Service
type UpdatableService struct {
Config *ServiceConfig `json:"config"`
}
type ServiceConfig struct {
AWS *AWSServiceConfig `json:"aws,omitempty"`
}
type AWSServiceConfig struct {
Logs *AWSServiceLogsConfig `json:"logs"`
Metrics *AWSServiceMetricsConfig `json:"metrics"`
}
// AWSServiceLogsConfig is AWS specific logs config for a service
// NOTE: the JSON keys are snake case for backward compatibility with existing agents.
type AWSServiceLogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type AWSServiceMetricsConfig struct {
Enabled bool `json:"enabled"`
}
// ServiceDefinitionMetadata represents service definition metadata. This is useful for showing service tab in frontend.
type ServiceDefinitionMetadata struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
}
type ServiceDefinition struct {
ServiceDefinitionMetadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"dataCollected"`
Strategy *CollectionStrategy `json:"telemetryCollectionStrategy"`
}
// CollectionStrategy is cloud provider specific configuration for signal collection,
// this is used by agent to understand the nitty-gritty for collecting telemetry for the cloud provider.
type CollectionStrategy struct {
AWS *AWSCollectionStrategy `json:"aws,omitempty"`
}
// Assets represents the collection of dashboards.
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
// SupportedSignals for cloud provider's service.
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
// DataCollected is curated static list of metrics and logs, this is shown as part of service overview.
type DataCollected struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
// CollectedLogAttribute represents a log attribute that is present in all log entries for a service,
// this is shown as part of service overview.
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
// CollectedMetric represents a metric that is collected for a service, this is shown as part of service overview.
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
// AWSCollectionStrategy represents signal collection strategy for AWS services.
// this is AWS specific.
// NOTE: this structure is still using snake case, for backward compatibility,
// with existing agents.
type AWSCollectionStrategy struct {
Metrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
Logs *AWSLogsStrategy `json:"aws_logs,omitempty"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type in AWS
}
// AWSMetricsStrategy represents metrics collection strategy for AWS services.
// this is AWS specific.
// NOTE: this structure is still using snake case, for backward compatibility,
// with existing agents.
type AWSMetricsStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
StreamFilters []struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
} `json:"cloudwatch_metric_stream_filters"`
}
// AWSLogsStrategy represents logs collection strategy for AWS services.
// this is AWS specific.
// NOTE: this structure is still using snake case, for backward compatibility,
// with existing agents.
type AWSLogsStrategy struct {
Subscriptions []struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
} `json:"cloudwatch_logs_subscriptions"`
}
// Dashboard represents a dashboard definition for cloud integration.
// This is used to show available pre-made dashboards for a service,
// hence has additional fields like id, title and description
type Dashboard struct {
Id string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
Definition dashboardtypes.StorableDashboardData `json:"definition,omitempty"`
}
// SupportedServices is the map of supported services for each cloud provider.
var SupportedServices = map[CloudProviderType][]ServiceID{
CloudProviderTypeAWS: {
{valuer.NewString("alb")},
{valuer.NewString("api-gateway")},
{valuer.NewString("dynamodb")},
{valuer.NewString("ec2")},
{valuer.NewString("ecs")},
{valuer.NewString("eks")},
{valuer.NewString("elasticache")},
{valuer.NewString("lambda")},
{valuer.NewString("msk")},
{valuer.NewString("rds")},
{valuer.NewString("s3sync")},
{valuer.NewString("sns")},
{valuer.NewString("sqs")},
},
}
// NewServiceID returns a new ServiceID from a string, validated against the supported services for the given cloud provider.
func NewServiceID(provider CloudProviderType, service string) (ServiceID, error) {
services, ok := SupportedServices[provider]
if !ok {
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "no services defined for cloud provider: %s", provider)
}
for _, s := range services {
if s.StringValue() == service {
return s, nil
}
}
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "invalid service id %q for cloud provider %s", service, provider)
}
// UTILS
// GetCloudIntegrationDashboardID returns the dashboard id for a cloud integration, given the cloud provider, service id, and dashboard id.
// This is used to generate unique dashboard ids for cloud integration, and also to parse the dashboard id to get the cloud provider and service id when needed.
func GetCloudIntegrationDashboardID(cloudProvider CloudProviderType, svcId, dashboardId string) string {
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcId, dashboardId)
}
// GetDashboardsFromAssets returns the list of dashboards for the cloud provider service from definition.
func GetDashboardsFromAssets(
svcId string,
orgID valuer.UUID,
cloudProvider CloudProviderType,
createdAt time.Time,
assets Assets,
) []*dashboardtypes.Dashboard {
dashboards := make([]*dashboardtypes.Dashboard, 0)
for _, d := range assets.Dashboards {
author := fmt.Sprintf("%s-integration", cloudProvider)
dashboards = append(dashboards, &dashboardtypes.Dashboard{
ID: GetCloudIntegrationDashboardID(cloudProvider, svcId, d.Id),
Locked: true,
OrgID: orgID,
Data: d.Definition,
TimeAuditable: types.TimeAuditable{
CreatedAt: createdAt,
UpdatedAt: createdAt,
},
UserAuditable: types.UserAuditable{
CreatedBy: author,
UpdatedBy: author,
},
})
}
return dashboards
}

View File

@@ -1,41 +0,0 @@
package cloudintegrationtypes
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Store interface {
// GetAccountByID returns a cloud integration account by id
GetAccountByID(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) (*StorableCloudIntegration, error)
// CreateAccount creates a new cloud integration account
CreateAccount(ctx context.Context, account *StorableCloudIntegration) (*StorableCloudIntegration, error)
// UpdateAccount updates an existing cloud integration account
UpdateAccount(ctx context.Context, account *StorableCloudIntegration) error
// RemoveAccount marks a cloud integration account as removed by setting the RemovedAt field
RemoveAccount(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) error
// ListConnectedAccounts returns all the cloud integration accounts for the org and cloud provider
ListConnectedAccounts(ctx context.Context, orgID valuer.UUID, provider CloudProviderType) ([]*StorableCloudIntegration, error)
// GetConnectedAccount for a given provider
GetConnectedAccount(ctx context.Context, orgID valuer.UUID, provider CloudProviderType, providerAccountID string) (*StorableCloudIntegration, error)
// cloud_integration_service related methods
// GetServiceByServiceID returns the cloud integration service for the given cloud integration id and service id
GetServiceByServiceID(ctx context.Context, cloudIntegrationID valuer.UUID, serviceID ServiceID) (*StorableCloudIntegrationService, error)
// CreateService creates a new cloud integration service
CreateService(ctx context.Context, service *StorableCloudIntegrationService) (*StorableCloudIntegrationService, error)
// UpdateService updates an existing cloud integration service
UpdateService(ctx context.Context, service *StorableCloudIntegrationService) error
// ListServices returns all the cloud integration services for the given cloud integration id
ListServices(ctx context.Context, cloudIntegrationID valuer.UUID) ([]*StorableCloudIntegrationService, error)
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/uptrace/bun"
)
@@ -18,15 +17,6 @@ const (
AgentStatusDisconnected
)
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
}
type StorableAgent struct {
bun.BaseModel `bun:"table:agent"`
@@ -40,6 +30,16 @@ type StorableAgent struct {
Config string `bun:"config,type:text,notnull"`
}
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
return StorableAgent{
OrgID: orgID,
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
AgentID: agentID,
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
Status: status,
}
}
type ElementType struct{ valuer.String }
var (
@@ -49,6 +49,24 @@ var (
ElementTypeLbExporter = ElementType{valuer.NewString("lb_exporter")}
)
// NewElementType creates a new ElementType from a string value.
// Returns the corresponding ElementType constant if the string matches,
// otherwise returns an empty ElementType.
func NewElementType(value string) ElementType {
switch valuer.NewString(value) {
case ElementTypeSamplingRules.String:
return ElementTypeSamplingRules
case ElementTypeDropRules.String:
return ElementTypeDropRules
case ElementTypeLogPipelines.String:
return ElementTypeLogPipelines
case ElementTypeLbExporter.String:
return ElementTypeLbExporter
default:
return ElementType{valuer.NewString("")}
}
}
type DeployStatus struct{ valuer.String }
var (
@@ -80,26 +98,6 @@ type AgentConfigVersion struct {
Config string `json:"config" bun:"config,type:text"`
}
type AgentConfigElement struct {
bun.BaseModel `bun:"table:agent_config_element"`
types.Identifiable
types.TimeAuditable
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
}
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
return StorableAgent{
OrgID: orgID,
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
AgentID: agentID,
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
Status: status,
}
}
func NewAgentConfigVersion(orgId valuer.UUID, userId valuer.UUID, elementType ElementType) *AgentConfigVersion {
return &AgentConfigVersion{
TimeAuditable: types.TimeAuditable{
@@ -120,20 +118,12 @@ func (a *AgentConfigVersion) IncrementVersion(lastVersion int) {
a.Version = lastVersion + 1
}
// NewElementType creates a new ElementType from a string value.
// Returns the corresponding ElementType constant if the string matches,
// otherwise returns an empty ElementType.
func NewElementType(value string) ElementType {
switch valuer.NewString(value) {
case ElementTypeSamplingRules.String:
return ElementTypeSamplingRules
case ElementTypeDropRules.String:
return ElementTypeDropRules
case ElementTypeLogPipelines.String:
return ElementTypeLogPipelines
case ElementTypeLbExporter.String:
return ElementTypeLbExporter
default:
return ElementType{valuer.NewString("")}
}
type AgentConfigElement struct {
bun.BaseModel `bun:"table:agent_config_element"`
types.Identifiable
types.TimeAuditable
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
}