mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-18 10:42:14 +00:00
Compare commits
5 Commits
service-ac
...
fix/order-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
099e7a98c5 | ||
|
|
fd19ff8e5e | ||
|
|
7b9e93162f | ||
|
|
f106f57097 | ||
|
|
5bafdeb373 |
@@ -308,6 +308,9 @@ user:
|
||||
allow_self: true
|
||||
# The duration within which a user can reset their password.
|
||||
max_token_lifetime: 6h
|
||||
invite:
|
||||
# The duration within which a user can accept their invite.
|
||||
max_token_lifetime: 48h
|
||||
root:
|
||||
# Whether to enable the root user. When enabled, a root user is provisioned
|
||||
# on startup using the email and password below. The root user cannot be
|
||||
|
||||
@@ -190,7 +190,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.115.0
|
||||
image: signoz/signoz:v0.116.0
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
# - "6060:6060" # pprof port
|
||||
|
||||
@@ -117,7 +117,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:v0.115.0
|
||||
image: signoz/signoz:v0.116.0
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
volumes:
|
||||
|
||||
@@ -181,7 +181,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.115.0}
|
||||
image: signoz/signoz:${VERSION:-v0.116.0}
|
||||
container_name: signoz
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
|
||||
@@ -109,7 +109,7 @@ services:
|
||||
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
|
||||
signoz:
|
||||
!!merge <<: *db-depend
|
||||
image: signoz/signoz:${VERSION:-v0.115.0}
|
||||
image: signoz/signoz:${VERSION:-v0.116.0}
|
||||
container_name: signoz
|
||||
ports:
|
||||
- "8080:8080" # signoz port
|
||||
|
||||
65
pkg/modules/cloudintegration/cloudintegration.go
Normal file
65
pkg/modules/cloudintegration/cloudintegration.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package cloudintegration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
citypes "github.com/SigNoz/signoz/pkg/types/cloudintegrationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Module interface {
|
||||
CreateAccount(ctx context.Context, account *citypes.Account) error
|
||||
|
||||
// GetAccount returns cloud integration account
|
||||
GetAccount(ctx context.Context, orgID, accountID valuer.UUID) (*citypes.Account, error)
|
||||
|
||||
// ListAccounts lists accounts where agent is connected
|
||||
ListAccounts(ctx context.Context, orgID valuer.UUID) ([]*citypes.Account, error)
|
||||
|
||||
// UpdateAccount updates the cloud integration account for a specific organization.
|
||||
UpdateAccount(ctx context.Context, account *citypes.Account) error
|
||||
|
||||
// DisconnectAccount soft deletes/removes a cloud integration account.
|
||||
DisconnectAccount(ctx context.Context, orgID, accountID valuer.UUID) error
|
||||
|
||||
// GetConnectionArtifact returns cloud provider specific connection information,
|
||||
// client side handles how this information is shown
|
||||
GetConnectionArtifact(ctx context.Context, account *citypes.Account, req *citypes.ConnectionArtifactRequest) (*citypes.ConnectionArtifact, error)
|
||||
|
||||
// ListServicesMetadata returns the list of services metadata for a cloud provider attached with the integrationID.
|
||||
// This just returns a summary of the service and not the whole service definition
|
||||
ListServicesMetadata(ctx context.Context, orgID valuer.UUID, integrationID *valuer.UUID) ([]*citypes.ServiceMetadata, error)
|
||||
|
||||
// GetService returns service definition details for a serviceID. This returns config and
|
||||
// other details required to show in service details page on web client.
|
||||
GetService(ctx context.Context, orgID valuer.UUID, integrationID *valuer.UUID, serviceID string) (*citypes.Service, error)
|
||||
|
||||
// UpdateService updates cloud integration service
|
||||
UpdateService(ctx context.Context, orgID valuer.UUID, service *citypes.CloudIntegrationService) error
|
||||
|
||||
// AgentCheckIn is called by agent to heartbeat and get latest config in response.
|
||||
AgentCheckIn(ctx context.Context, orgID valuer.UUID, req *citypes.AgentCheckInRequest) (*citypes.AgentCheckInResponse, error)
|
||||
|
||||
// GetDashboardByID returns dashboard JSON for a given dashboard id.
|
||||
// this only returns the dashboard when the service (embedded in dashboard id) is enabled
|
||||
// in the org for any cloud integration account
|
||||
GetDashboardByID(ctx context.Context, orgID valuer.UUID, id string) (*dashboardtypes.Dashboard, error)
|
||||
|
||||
// ListDashboards returns list of dashboards across all connected cloud integration accounts
|
||||
// for enabled services in the org. This list gets added to dashboard list page
|
||||
ListDashboards(ctx context.Context, orgID valuer.UUID) ([]*dashboardtypes.Dashboard, error)
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
GetConnectionArtifact(http.ResponseWriter, *http.Request)
|
||||
ListAccounts(http.ResponseWriter, *http.Request)
|
||||
GetAccount(http.ResponseWriter, *http.Request)
|
||||
UpdateAccount(http.ResponseWriter, *http.Request)
|
||||
DisconnectAccount(http.ResponseWriter, *http.Request)
|
||||
ListServicesMetadata(http.ResponseWriter, *http.Request)
|
||||
GetService(http.ResponseWriter, *http.Request)
|
||||
UpdateService(http.ResponseWriter, *http.Request)
|
||||
AgentCheckIn(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
@@ -27,7 +27,12 @@ type OrgConfig struct {
|
||||
}
|
||||
|
||||
type PasswordConfig struct {
|
||||
Reset ResetConfig `mapstructure:"reset"`
|
||||
Invite InviteConfig `mapstructure:"invite"`
|
||||
Reset ResetConfig `mapstructure:"reset"`
|
||||
}
|
||||
|
||||
type InviteConfig struct {
|
||||
MaxTokenLifetime time.Duration `mapstructure:"max_token_lifetime"`
|
||||
}
|
||||
|
||||
type ResetConfig struct {
|
||||
@@ -46,6 +51,9 @@ func newConfig() factory.Config {
|
||||
AllowSelf: false,
|
||||
MaxTokenLifetime: 6 * time.Hour,
|
||||
},
|
||||
Invite: InviteConfig{
|
||||
MaxTokenLifetime: 48 * time.Hour,
|
||||
},
|
||||
},
|
||||
Root: RootConfig{
|
||||
Enabled: false,
|
||||
@@ -61,6 +69,10 @@ func (c Config) Validate() error {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "user::password::reset::max_token_lifetime must be positive")
|
||||
}
|
||||
|
||||
if c.Password.Invite.MaxTokenLifetime <= 0 {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "user::password::invite::max_token_lifetime must be positive")
|
||||
}
|
||||
|
||||
if c.Root.Enabled {
|
||||
if c.Root.Email.IsZero() {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "user::root::email is required when root user is enabled")
|
||||
|
||||
@@ -203,7 +203,7 @@ func (m *Module) CreateBulkInvite(ctx context.Context, orgID valuer.UUID, userID
|
||||
|
||||
resetLink := userWithToken.ResetPasswordToken.FactorPasswordResetLink(frontendBaseUrl)
|
||||
|
||||
tokenLifetime := m.config.Password.Reset.MaxTokenLifetime
|
||||
tokenLifetime := m.config.Password.Invite.MaxTokenLifetime
|
||||
humanizedTokenLifetime := strings.TrimSpace(humanize.RelTime(time.Now(), time.Now().Add(tokenLifetime), "", ""))
|
||||
|
||||
if err := m.emailing.SendHTML(ctx, userWithToken.User.Email.String(), "You're Invited to Join SigNoz", emailtypes.TemplateNameInvitationEmail, map[string]any{
|
||||
@@ -460,7 +460,11 @@ func (module *Module) GetOrCreateResetPasswordToken(ctx context.Context, userID
|
||||
}
|
||||
|
||||
// create a new token
|
||||
resetPasswordToken, err := types.NewResetPasswordToken(password.ID, time.Now().Add(module.config.Password.Reset.MaxTokenLifetime))
|
||||
tokenLifetime := module.config.Password.Reset.MaxTokenLifetime
|
||||
if user.Status == types.UserStatusPendingInvite {
|
||||
tokenLifetime = module.config.Password.Invite.MaxTokenLifetime
|
||||
}
|
||||
resetPasswordToken, err := types.NewResetPasswordToken(password.ID, time.Now().Add(tokenLifetime))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -500,6 +504,9 @@ func (module *Module) ForgotPassword(ctx context.Context, orgID valuer.UUID, ema
|
||||
resetLink := token.FactorPasswordResetLink(frontendBaseURL)
|
||||
|
||||
tokenLifetime := module.config.Password.Reset.MaxTokenLifetime
|
||||
if user.Status == types.UserStatusPendingInvite {
|
||||
tokenLifetime = module.config.Password.Invite.MaxTokenLifetime
|
||||
}
|
||||
humanizedTokenLifetime := strings.TrimSpace(humanize.RelTime(time.Now(), time.Now().Add(tokenLifetime), "", ""))
|
||||
|
||||
if err := module.emailing.SendHTML(
|
||||
|
||||
@@ -115,7 +115,6 @@ func (r *Repo) GetLatestVersion(
|
||||
func (r *Repo) insertConfig(
|
||||
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
|
||||
) error {
|
||||
|
||||
if c.ElementType.StringValue() == "" {
|
||||
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
|
||||
}
|
||||
@@ -229,6 +228,25 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
|
||||
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
var version opamptypes.AgentConfigVersion
|
||||
err := r.store.BunDB().NewSelect().
|
||||
Model(&version).
|
||||
ColumnExpr("deploy_status").
|
||||
Where("hash = ?", configHash).
|
||||
Where("org_id = ?", orgId).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
|
||||
}
|
||||
return version.DeployStatus, nil
|
||||
}
|
||||
|
||||
func (r *Repo) updateDeployStatusByHash(
|
||||
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
|
||||
) error {
|
||||
|
||||
@@ -180,6 +180,12 @@ func (m *Manager) ReportConfigDeploymentStatus(
|
||||
}
|
||||
}
|
||||
|
||||
// Implements model.AgentConfigProvider
|
||||
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
|
||||
}
|
||||
|
||||
|
||||
func GetLatestVersion(
|
||||
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
|
||||
) (*opamptypes.AgentConfigVersion, error) {
|
||||
|
||||
@@ -131,38 +131,32 @@ func (ic *LogParsingPipelineController) ValidatePipelines(ctx context.Context,
|
||||
return err
|
||||
}
|
||||
|
||||
func (ic *LogParsingPipelineController) getDefaultPipelines() ([]pipelinetypes.GettablePipeline, error) {
|
||||
defaultPipelines := []pipelinetypes.GettablePipeline{}
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
preprocessingPipeline := pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
||||
Name: "Default Pipeline - PreProcessing Body",
|
||||
Alias: "NormalizeBodyDefault",
|
||||
Enabled: true,
|
||||
},
|
||||
Filter: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body",
|
||||
},
|
||||
Operator: v3.FilterOperatorExists,
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []pipelinetypes.PipelineOperator{
|
||||
func (ic *LogParsingPipelineController) getNormalizePipeline() pipelinetypes.GettablePipeline {
|
||||
return pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
||||
Name: "Default Pipeline - PreProcessing Body",
|
||||
Alias: "NormalizeBodyDefault",
|
||||
Enabled: true,
|
||||
},
|
||||
Filter: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
ID: uuid.NewString(),
|
||||
Type: "normalize",
|
||||
Enabled: true,
|
||||
If: "body != nil",
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body",
|
||||
},
|
||||
Operator: v3.FilterOperatorExists,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
defaultPipelines = append(defaultPipelines, preprocessingPipeline)
|
||||
},
|
||||
Config: []pipelinetypes.PipelineOperator{
|
||||
{
|
||||
ID: uuid.NewString(),
|
||||
Type: "normalize",
|
||||
Enabled: true,
|
||||
If: "body != nil",
|
||||
},
|
||||
},
|
||||
}
|
||||
return defaultPipelines, nil
|
||||
}
|
||||
|
||||
// Returns effective list of pipelines including user created
|
||||
@@ -295,12 +289,10 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
// recommend default pipelines along with user created pipelines
|
||||
defaultPipelines, err := pc.getDefaultPipelines()
|
||||
if err != nil {
|
||||
return nil, "", model.InternalError(fmt.Errorf("failed to get default pipelines: %w", err))
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
// add default normalize pipeline at the beginning
|
||||
pipelinesResp.Pipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, pipelinesResp.Pipelines...)
|
||||
}
|
||||
pipelinesResp.Pipelines = append(pipelinesResp.Pipelines, defaultPipelines...)
|
||||
|
||||
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, pipelinesResp.Pipelines)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package opamp
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
)
|
||||
|
||||
// Interface for a source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/google/uuid"
|
||||
"github.com/knadh/koanf"
|
||||
@@ -127,6 +128,11 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
|
||||
return exists
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
|
||||
subscriberId := uuid.NewString()
|
||||
|
||||
@@ -111,90 +111,99 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
prevStatus := agent.Status
|
||||
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
} else {
|
||||
// Not a new Agent. Update the Status.
|
||||
agent.Status.SequenceNum = newStatus.SequenceNum
|
||||
|
||||
// Check what's changed in the AgentDescription.
|
||||
if newStatus.AgentDescription != nil {
|
||||
// If the AgentDescription field is set it means the Agent tells us
|
||||
// something is changed in the field since the last status report
|
||||
// (or this is the first report).
|
||||
// Make full comparison of previous and new descriptions to see if it
|
||||
// really is different.
|
||||
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
|
||||
// Agent description didn't change.
|
||||
agentDescrChanged = false
|
||||
} else {
|
||||
// Yes, the description is different, update it.
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
agentDescrChanged = true
|
||||
}
|
||||
} else {
|
||||
// AgentDescription field is not set, which means description didn't change.
|
||||
agentDescrChanged = false
|
||||
}
|
||||
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil &&
|
||||
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
|
||||
}
|
||||
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
|
||||
}
|
||||
}
|
||||
// agentDescriptionChanged returns true when the agent sends updated properties
|
||||
// (e.g. capability flag, version) mid-connection, signalling the server to
|
||||
// recompute and push a new RemoteConfig.
|
||||
//
|
||||
// On reconnect this always returns false: handleFirstStatus pre-copies
|
||||
// AgentDescription into agent.Status so no diff is detected, avoiding a
|
||||
// redundant config recompute.
|
||||
func (agent *Agent) agentDescriptionChanged(newStatus *protobufs.AgentToServer) bool {
|
||||
// nil AgentDescription means no change per OpAMP protocol.
|
||||
if newStatus.AgentDescription == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if agentDescrChanged {
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
|
||||
return false
|
||||
}
|
||||
|
||||
return agentDescrChanged
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
return true
|
||||
}
|
||||
|
||||
func (agent *Agent) updateHealth(newStatus *protobufs.AgentToServer) {
|
||||
if newStatus.Health == nil {
|
||||
// updateRemoteConfigStatus updates the stored RemoteConfigStatus and notifies
|
||||
// subscribers if the status has changed relative to what we have stored.
|
||||
func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer) {
|
||||
if newStatus.RemoteConfigStatus == nil ||
|
||||
proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
return
|
||||
}
|
||||
|
||||
agent.Status.Health = newStatus.Health
|
||||
|
||||
if agent.Status != nil && agent.Status.Health != nil && agent.Status.Health.Healthy {
|
||||
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC()
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
hash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
switch newStatus.RemoteConfigStatus.Status {
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, hash, newStatus.RemoteConfigStatus.ErrorMessage)
|
||||
}
|
||||
}
|
||||
|
||||
func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer) {
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
// handleFirstStatus initializes agent.Status on the first message received from
|
||||
// this agent instance. It is a no-op for all subsequent messages.
|
||||
func (agent *Agent) handleFirstStatus(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) {
|
||||
if agent.Status != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Initialize with a clean slate.
|
||||
agent.Status = &protobufs.AgentToServer{
|
||||
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
|
||||
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
},
|
||||
}
|
||||
|
||||
if newStatus.RemoteConfigStatus == nil ||
|
||||
newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
|
||||
// Agent just started fresh — no prior deployment to reconcile with the DB.
|
||||
return
|
||||
}
|
||||
|
||||
// Since the server's connection is restarted;
|
||||
// copy the agent description; so no change is detected by agentDescriptionChanged
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
|
||||
// Server reconnected while the agent was already running.
|
||||
// Reconcile deployment status with DB; DB is the source of truth.
|
||||
// If DB says in_progress but agent now reports APPLIED/FAILED,
|
||||
// updateRemoteConfigStatus will detect the transition and notify subscribers.
|
||||
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, agent.OrgID.String()+rawHash)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
|
||||
|
||||
// If the deployment is still in-flight, rehydrate the subscriber so that
|
||||
// updateRemoteConfigStatus can fire onConfigSuccess/onConfigFailure when
|
||||
// the agent next reports a terminal status.
|
||||
if deployStatus != opamptypes.Deployed && deployStatus != opamptypes.DeployFailed {
|
||||
ListenToConfigUpdate(agent.OrgID, agent.AgentID, rawHash, configProvider.ReportConfigDeploymentStatus)
|
||||
}
|
||||
}
|
||||
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) bool {
|
||||
agent.handleFirstStatus(newStatus, configProvider)
|
||||
agentDescrChanged := agent.agentDescriptionChanged(newStatus)
|
||||
// record healthy timestamp
|
||||
if newStatus.Health != nil && newStatus.Health.Healthy {
|
||||
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(newStatus.Health.StartTimeUnixNano)).UTC()
|
||||
}
|
||||
|
||||
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
|
||||
// notify subscribers first; this will update the status in the DB
|
||||
agent.updateRemoteConfigStatus(newStatus)
|
||||
agent.updateHealth(newStatus)
|
||||
// update local reference in last.
|
||||
agent.Status = newStatus
|
||||
return agentDescrChanged
|
||||
}
|
||||
|
||||
@@ -237,7 +246,7 @@ func (agent *Agent) processStatusUpdate(
|
||||
// current status is not up-to-date.
|
||||
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
|
||||
|
||||
agentDescrChanged := agent.updateStatusField(newStatus)
|
||||
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
|
||||
|
||||
// Check if any fields were omitted in the status report.
|
||||
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
package model
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/valuer"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Interface for source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
@@ -20,4 +25,10 @@ type AgentConfigProvider interface {
|
||||
configId string,
|
||||
err error,
|
||||
)
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
|
||||
// no matching row exists. Used by the agent's first-connect handler to
|
||||
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
|
||||
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
|
||||
}
|
||||
|
||||
@@ -66,6 +66,7 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
|
||||
defer coordinator.mutex.Unlock()
|
||||
|
||||
key := getSubscriberKey(orgId, hash)
|
||||
|
||||
if subs, ok := coordinator.subscribers[key]; ok {
|
||||
subs = append(subs, ss)
|
||||
coordinator.subscribers[key] = subs
|
||||
|
||||
43
pkg/types/cloudintegrationtypes/account.go
Normal file
43
pkg/types/cloudintegrationtypes/account.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Account struct {
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ProviderAccountId *string `json:"providerAccountID,omitempty"`
|
||||
Provider CloudProviderType `json:"provider"`
|
||||
RemovedAt *time.Time `json:"removedAt,omitempty"`
|
||||
AgentReport *AgentReport `json:"agentReport,omitempty"`
|
||||
OrgID valuer.UUID `json:"orgID"`
|
||||
Config *AccountConfig `json:"config,omitempty"`
|
||||
}
|
||||
|
||||
// AgentReport represents heartbeats sent by the agent.
|
||||
type AgentReport struct {
|
||||
TimestampMillis int64 `json:"timestampMillis"`
|
||||
Data map[string]any `json:"data"`
|
||||
}
|
||||
|
||||
type GettableAccounts struct {
|
||||
Accounts []*Account `json:"accounts"`
|
||||
}
|
||||
|
||||
type GettableAccount = Account
|
||||
|
||||
type UpdatableAccount struct {
|
||||
Config *AccountConfig `json:"config"`
|
||||
}
|
||||
|
||||
type AccountConfig struct {
|
||||
AWS *AWSAccountConfig `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
type AWSAccountConfig struct {
|
||||
Regions []string `json:"regions"`
|
||||
}
|
||||
80
pkg/types/cloudintegrationtypes/cloudintegration.go
Normal file
80
pkg/types/cloudintegrationtypes/cloudintegration.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/uptrace/bun"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeCloudIntegrationNotFound = errors.MustNewCode("cloud_integration_not_found")
|
||||
)
|
||||
|
||||
// StorableCloudIntegration represents a cloud integration stored in the database.
|
||||
// This is also referred as "Account" in the context of cloud integrations.
|
||||
type StorableCloudIntegration struct {
|
||||
bun.BaseModel `bun:"table:cloud_integration"`
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
|
||||
Provider CloudProviderType `bun:"provider,type:text"`
|
||||
Config string `bun:"config,type:text"` // Config is provider-specific data in JSON string format
|
||||
AccountID *string `bun:"account_id,type:text"`
|
||||
LastAgentReport *StorableAgentReport `bun:"last_agent_report,type:text"`
|
||||
RemovedAt *time.Time `bun:"removed_at,type:timestamp,nullzero"`
|
||||
OrgID valuer.UUID `bun:"org_id,type:text"`
|
||||
}
|
||||
|
||||
// StorableAgentReport represents the last heartbeat and arbitrary data sent by the agent
|
||||
// as of now there is no use case for Data field, but keeping it for backwards compatibility with older structure.
|
||||
type StorableAgentReport struct {
|
||||
TimestampMillis int64 `json:"timestamp_millis"` // backward compatibility
|
||||
Data map[string]any `json:"data"`
|
||||
}
|
||||
|
||||
// StorableCloudIntegrationService is to store service config for a cloud integration, which is a cloud provider specific configuration.
|
||||
type StorableCloudIntegrationService struct {
|
||||
bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"`
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
|
||||
Type ServiceID `bun:"type,type:text,notnull"` // Keeping Type field name as is, but it is a service id
|
||||
Config string `bun:"config,type:text"` // Config is cloud provider's service specific data in JSON string format
|
||||
CloudIntegrationID valuer.UUID `bun:"cloud_integration_id,type:text"`
|
||||
}
|
||||
|
||||
// Scan scans value from DB.
|
||||
func (r *StorableAgentReport) Scan(src any) error {
|
||||
var data []byte
|
||||
switch v := src.(type) {
|
||||
case []byte:
|
||||
data = v
|
||||
case string:
|
||||
data = []byte(v)
|
||||
default:
|
||||
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
|
||||
}
|
||||
return json.Unmarshal(data, r)
|
||||
}
|
||||
|
||||
// Value creates value to be stored in DB.
|
||||
func (r *StorableAgentReport) Value() (driver.Value, error) {
|
||||
if r == nil {
|
||||
return nil, errors.NewInternalf(errors.CodeInternal, "agent report is nil")
|
||||
}
|
||||
|
||||
serialized, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(
|
||||
err, errors.CodeInternal, "couldn't serialize agent report to JSON",
|
||||
)
|
||||
}
|
||||
// Return as string instead of []byte to ensure PostgreSQL stores as text, not bytes
|
||||
return string(serialized), nil
|
||||
}
|
||||
41
pkg/types/cloudintegrationtypes/cloudprovider.go
Normal file
41
pkg/types/cloudintegrationtypes/cloudprovider.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// CloudProviderType type alias.
|
||||
type CloudProviderType struct{ valuer.String }
|
||||
|
||||
var (
|
||||
// cloud providers.
|
||||
CloudProviderTypeAWS = CloudProviderType{valuer.NewString("aws")}
|
||||
CloudProviderTypeAzure = CloudProviderType{valuer.NewString("azure")}
|
||||
|
||||
// errors.
|
||||
ErrCodeCloudProviderInvalidInput = errors.MustNewCode("invalid_cloud_provider")
|
||||
|
||||
AWSIntegrationUserEmail = valuer.MustNewEmail("aws-integration@signoz.io")
|
||||
AzureIntegrationUserEmail = valuer.MustNewEmail("azure-integration@signoz.io")
|
||||
)
|
||||
|
||||
// CloudIntegrationUserEmails is the list of valid emails for Cloud One Click integrations.
|
||||
// This is used for validation and restrictions in different contexts, across codebase.
|
||||
var CloudIntegrationUserEmails = []valuer.Email{
|
||||
AWSIntegrationUserEmail,
|
||||
AzureIntegrationUserEmail,
|
||||
}
|
||||
|
||||
// NewCloudProvider returns a new CloudProviderType from a string.
|
||||
// It validates the input and returns an error if the input is not valid cloud provider.
|
||||
func NewCloudProvider(provider string) (CloudProviderType, error) {
|
||||
switch provider {
|
||||
case CloudProviderTypeAWS.StringValue():
|
||||
return CloudProviderTypeAWS, nil
|
||||
case CloudProviderTypeAzure.StringValue():
|
||||
return CloudProviderTypeAzure, nil
|
||||
default:
|
||||
return CloudProviderType{}, errors.NewInvalidInputf(ErrCodeCloudProviderInvalidInput, "invalid cloud provider: %s", provider)
|
||||
}
|
||||
}
|
||||
88
pkg/types/cloudintegrationtypes/connection.go
Normal file
88
pkg/types/cloudintegrationtypes/connection.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/integrationtypes"
|
||||
|
||||
type ConnectionArtifactRequest struct {
|
||||
Aws *AWSConnectionArtifactRequest `json:"aws"`
|
||||
}
|
||||
|
||||
type AWSConnectionArtifactRequest struct {
|
||||
DeploymentRegion string `json:"deploymentRegion"`
|
||||
Regions []string `json:"regions"`
|
||||
}
|
||||
|
||||
type PostableConnectionArtifact = ConnectionArtifactRequest
|
||||
|
||||
type ConnectionArtifact struct {
|
||||
Aws *AWSConnectionArtifact `json:"aws"`
|
||||
}
|
||||
|
||||
type AWSConnectionArtifact struct {
|
||||
ConnectionUrl string `json:"connectionURL"`
|
||||
}
|
||||
|
||||
type GettableConnectionArtifact = ConnectionArtifact
|
||||
|
||||
type AccountStatus struct {
|
||||
Id string `json:"id"`
|
||||
ProviderAccountId *string `json:"providerAccountID,omitempty"`
|
||||
Status integrationtypes.AccountStatus `json:"status"`
|
||||
}
|
||||
|
||||
type GettableAccountStatus = AccountStatus
|
||||
|
||||
type AgentCheckInRequest struct {
|
||||
// older backward compatible fields are mapped to new fields
|
||||
// CloudIntegrationId string `json:"cloudIntegrationId"`
|
||||
// AccountId string `json:"accountId"`
|
||||
|
||||
// New fields
|
||||
ProviderAccountId string `json:"providerAccountId"`
|
||||
CloudAccountId string `json:"cloudAccountId"`
|
||||
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
type PostableAgentCheckInRequest struct {
|
||||
AgentCheckInRequest
|
||||
// following are backward compatible fields for older running agents
|
||||
// which gets mapped to new fields in AgentCheckInRequest
|
||||
CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
CloudAccountId string `json:"cloud_account_id"`
|
||||
}
|
||||
|
||||
type GettableAgentCheckInResponse struct {
|
||||
AgentCheckInResponse
|
||||
|
||||
// For backward compatibility
|
||||
CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
AccountId string `json:"account_id"`
|
||||
}
|
||||
|
||||
type AgentCheckInResponse struct {
|
||||
// Older fields for backward compatibility are mapped to new fields below
|
||||
// CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
// AccountId string `json:"account_id"`
|
||||
|
||||
// New fields
|
||||
ProviderAccountId string `json:"providerAccountId"`
|
||||
CloudAccountId string `json:"cloudAccountId"`
|
||||
|
||||
// IntegrationConfig populates data related to integration that is required for an agent
|
||||
// to start collecting telemetry data
|
||||
// keeping JSON key snake_case for backward compatibility
|
||||
IntegrationConfig *IntegrationConfig `json:"integration_config,omitempty"`
|
||||
}
|
||||
|
||||
type IntegrationConfig struct {
|
||||
EnabledRegions []string `json:"enabledRegions"` // backward compatible
|
||||
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"` // backward compatible
|
||||
|
||||
// new fields
|
||||
AWS *AWSIntegrationConfig `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
type AWSIntegrationConfig struct {
|
||||
EnabledRegions []string `json:"enabledRegions"`
|
||||
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"`
|
||||
}
|
||||
103
pkg/types/cloudintegrationtypes/regions.go
Normal file
103
pkg/types/cloudintegrationtypes/regions.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeInvalidCloudRegion = errors.MustNewCode("invalid_cloud_region")
|
||||
ErrCodeMismatchCloudProvider = errors.MustNewCode("cloud_provider_mismatch")
|
||||
)
|
||||
|
||||
// List of all valid cloud regions on Amazon Web Services.
|
||||
var ValidAWSRegions = map[string]struct{}{
|
||||
"af-south-1": {}, // Africa (Cape Town).
|
||||
"ap-east-1": {}, // Asia Pacific (Hong Kong).
|
||||
"ap-northeast-1": {}, // Asia Pacific (Tokyo).
|
||||
"ap-northeast-2": {}, // Asia Pacific (Seoul).
|
||||
"ap-northeast-3": {}, // Asia Pacific (Osaka).
|
||||
"ap-south-1": {}, // Asia Pacific (Mumbai).
|
||||
"ap-south-2": {}, // Asia Pacific (Hyderabad).
|
||||
"ap-southeast-1": {}, // Asia Pacific (Singapore).
|
||||
"ap-southeast-2": {}, // Asia Pacific (Sydney).
|
||||
"ap-southeast-3": {}, // Asia Pacific (Jakarta).
|
||||
"ap-southeast-4": {}, // Asia Pacific (Melbourne).
|
||||
"ca-central-1": {}, // Canada (Central).
|
||||
"ca-west-1": {}, // Canada West (Calgary).
|
||||
"eu-central-1": {}, // Europe (Frankfurt).
|
||||
"eu-central-2": {}, // Europe (Zurich).
|
||||
"eu-north-1": {}, // Europe (Stockholm).
|
||||
"eu-south-1": {}, // Europe (Milan).
|
||||
"eu-south-2": {}, // Europe (Spain).
|
||||
"eu-west-1": {}, // Europe (Ireland).
|
||||
"eu-west-2": {}, // Europe (London).
|
||||
"eu-west-3": {}, // Europe (Paris).
|
||||
"il-central-1": {}, // Israel (Tel Aviv).
|
||||
"me-central-1": {}, // Middle East (UAE).
|
||||
"me-south-1": {}, // Middle East (Bahrain).
|
||||
"sa-east-1": {}, // South America (Sao Paulo).
|
||||
"us-east-1": {}, // US East (N. Virginia).
|
||||
"us-east-2": {}, // US East (Ohio).
|
||||
"us-west-1": {}, // US West (N. California).
|
||||
"us-west-2": {}, // US West (Oregon).
|
||||
}
|
||||
|
||||
// List of all valid cloud regions for Microsoft Azure.
|
||||
var ValidAzureRegions = map[string]struct{}{
|
||||
"australiacentral": {}, // Australia Central
|
||||
"australiacentral2": {}, // Australia Central 2
|
||||
"australiaeast": {}, // Australia East
|
||||
"australiasoutheast": {}, // Australia Southeast
|
||||
"austriaeast": {}, // Austria East
|
||||
"belgiumcentral": {}, // Belgium Central
|
||||
"brazilsouth": {}, // Brazil South
|
||||
"brazilsoutheast": {}, // Brazil Southeast
|
||||
"canadacentral": {}, // Canada Central
|
||||
"canadaeast": {}, // Canada East
|
||||
"centralindia": {}, // Central India
|
||||
"centralus": {}, // Central US
|
||||
"chilecentral": {}, // Chile Central
|
||||
"denmarkeast": {}, // Denmark East
|
||||
"eastasia": {}, // East Asia
|
||||
"eastus": {}, // East US
|
||||
"eastus2": {}, // East US 2
|
||||
"francecentral": {}, // France Central
|
||||
"francesouth": {}, // France South
|
||||
"germanynorth": {}, // Germany North
|
||||
"germanywestcentral": {}, // Germany West Central
|
||||
"indonesiacentral": {}, // Indonesia Central
|
||||
"israelcentral": {}, // Israel Central
|
||||
"italynorth": {}, // Italy North
|
||||
"japaneast": {}, // Japan East
|
||||
"japanwest": {}, // Japan West
|
||||
"koreacentral": {}, // Korea Central
|
||||
"koreasouth": {}, // Korea South
|
||||
"malaysiawest": {}, // Malaysia West
|
||||
"mexicocentral": {}, // Mexico Central
|
||||
"newzealandnorth": {}, // New Zealand North
|
||||
"northcentralus": {}, // North Central US
|
||||
"northeurope": {}, // North Europe
|
||||
"norwayeast": {}, // Norway East
|
||||
"norwaywest": {}, // Norway West
|
||||
"polandcentral": {}, // Poland Central
|
||||
"qatarcentral": {}, // Qatar Central
|
||||
"southafricanorth": {}, // South Africa North
|
||||
"southafricawest": {}, // South Africa West
|
||||
"southcentralus": {}, // South Central US
|
||||
"southindia": {}, // South India
|
||||
"southeastasia": {}, // Southeast Asia
|
||||
"spaincentral": {}, // Spain Central
|
||||
"swedencentral": {}, // Sweden Central
|
||||
"switzerlandnorth": {}, // Switzerland North
|
||||
"switzerlandwest": {}, // Switzerland West
|
||||
"uaecentral": {}, // UAE Central
|
||||
"uaenorth": {}, // UAE North
|
||||
"uksouth": {}, // UK South
|
||||
"ukwest": {}, // UK West
|
||||
"westcentralus": {}, // West Central US
|
||||
"westeurope": {}, // West Europe
|
||||
"westindia": {}, // West India
|
||||
"westus": {}, // West US
|
||||
"westus2": {}, // West US 2
|
||||
"westus3": {}, // West US 3
|
||||
}
|
||||
248
pkg/types/cloudintegrationtypes/service.go
Normal file
248
pkg/types/cloudintegrationtypes/service.go
Normal file
@@ -0,0 +1,248 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
var (
|
||||
S3Sync = valuer.NewString("s3sync")
|
||||
// ErrCodeInvalidServiceID is the error code for invalid service id.
|
||||
ErrCodeInvalidServiceID = errors.MustNewCode("invalid_service_id")
|
||||
)
|
||||
|
||||
type ServiceID struct{ valuer.String }
|
||||
|
||||
type CloudIntegrationService struct {
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
Type ServiceID `json:"type"`
|
||||
Config *ServiceConfig `json:"config"`
|
||||
CloudIntegrationID valuer.UUID `json:"cloudIntegrationID"`
|
||||
}
|
||||
|
||||
// ServiceMetadata helps to quickly list available services and whether it is enabled or not.
|
||||
// As getting complete service definition is a heavy operation and the response is also large,
|
||||
// initial integration page load can be very slow.
|
||||
type ServiceMetadata struct {
|
||||
ServiceDefinitionMetadata
|
||||
// if the service is enabled for the account
|
||||
Enabled bool `json:"enabled"`
|
||||
}
|
||||
|
||||
type GettableServicesMetadata struct {
|
||||
Services []*ServiceMetadata `json:"services"`
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
ServiceDefinition
|
||||
ServiceConfig *ServiceConfig `json:"serviceConfig"`
|
||||
}
|
||||
|
||||
type GettableService = Service
|
||||
|
||||
type UpdatableService struct {
|
||||
Config *ServiceConfig `json:"config"`
|
||||
}
|
||||
|
||||
type ServiceConfig struct {
|
||||
AWS *AWSServiceConfig `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
type AWSServiceConfig struct {
|
||||
Logs *AWSServiceLogsConfig `json:"logs"`
|
||||
Metrics *AWSServiceMetricsConfig `json:"metrics"`
|
||||
}
|
||||
|
||||
// AWSServiceLogsConfig is AWS specific logs config for a service
|
||||
// NOTE: the JSON keys are snake case for backward compatibility with existing agents.
|
||||
type AWSServiceLogsConfig struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
|
||||
}
|
||||
|
||||
type AWSServiceMetricsConfig struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
}
|
||||
|
||||
// ServiceDefinitionMetadata represents service definition metadata. This is useful for showing service tab in frontend.
|
||||
type ServiceDefinitionMetadata struct {
|
||||
Id string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Icon string `json:"icon"`
|
||||
}
|
||||
|
||||
type ServiceDefinition struct {
|
||||
ServiceDefinitionMetadata
|
||||
Overview string `json:"overview"` // markdown
|
||||
Assets Assets `json:"assets"`
|
||||
SupportedSignals SupportedSignals `json:"supported_signals"`
|
||||
DataCollected DataCollected `json:"dataCollected"`
|
||||
Strategy *CollectionStrategy `json:"telemetryCollectionStrategy"`
|
||||
}
|
||||
|
||||
// CollectionStrategy is cloud provider specific configuration for signal collection,
|
||||
// this is used by agent to understand the nitty-gritty for collecting telemetry for the cloud provider.
|
||||
type CollectionStrategy struct {
|
||||
AWS *AWSCollectionStrategy `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
// Assets represents the collection of dashboards.
|
||||
type Assets struct {
|
||||
Dashboards []Dashboard `json:"dashboards"`
|
||||
}
|
||||
|
||||
// SupportedSignals for cloud provider's service.
|
||||
type SupportedSignals struct {
|
||||
Logs bool `json:"logs"`
|
||||
Metrics bool `json:"metrics"`
|
||||
}
|
||||
|
||||
// DataCollected is curated static list of metrics and logs, this is shown as part of service overview.
|
||||
type DataCollected struct {
|
||||
Logs []CollectedLogAttribute `json:"logs"`
|
||||
Metrics []CollectedMetric `json:"metrics"`
|
||||
}
|
||||
|
||||
// CollectedLogAttribute represents a log attribute that is present in all log entries for a service,
|
||||
// this is shown as part of service overview.
|
||||
type CollectedLogAttribute struct {
|
||||
Name string `json:"name"`
|
||||
Path string `json:"path"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// CollectedMetric represents a metric that is collected for a service, this is shown as part of service overview.
|
||||
type CollectedMetric struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Unit string `json:"unit"`
|
||||
Description string `json:"description"`
|
||||
}
|
||||
|
||||
// AWSCollectionStrategy represents signal collection strategy for AWS services.
|
||||
// this is AWS specific.
|
||||
// NOTE: this structure is still using snake case, for backward compatibility,
|
||||
// with existing agents.
|
||||
type AWSCollectionStrategy struct {
|
||||
Metrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
|
||||
Logs *AWSLogsStrategy `json:"aws_logs,omitempty"`
|
||||
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type in AWS
|
||||
}
|
||||
|
||||
// AWSMetricsStrategy represents metrics collection strategy for AWS services.
|
||||
// this is AWS specific.
|
||||
// NOTE: this structure is still using snake case, for backward compatibility,
|
||||
// with existing agents.
|
||||
type AWSMetricsStrategy struct {
|
||||
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
|
||||
StreamFilters []struct {
|
||||
// json tags here are in the shape expected by AWS API as detailed at
|
||||
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
|
||||
Namespace string `json:"Namespace"`
|
||||
MetricNames []string `json:"MetricNames,omitempty"`
|
||||
} `json:"cloudwatch_metric_stream_filters"`
|
||||
}
|
||||
|
||||
// AWSLogsStrategy represents logs collection strategy for AWS services.
|
||||
// this is AWS specific.
|
||||
// NOTE: this structure is still using snake case, for backward compatibility,
|
||||
// with existing agents.
|
||||
type AWSLogsStrategy struct {
|
||||
Subscriptions []struct {
|
||||
// subscribe to all logs groups with specified prefix.
|
||||
// eg: `/aws/rds/`
|
||||
LogGroupNamePrefix string `json:"log_group_name_prefix"`
|
||||
|
||||
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
|
||||
// "" implies no filtering is required.
|
||||
FilterPattern string `json:"filter_pattern"`
|
||||
} `json:"cloudwatch_logs_subscriptions"`
|
||||
}
|
||||
|
||||
// Dashboard represents a dashboard definition for cloud integration.
|
||||
// This is used to show available pre-made dashboards for a service,
|
||||
// hence has additional fields like id, title and description
|
||||
type Dashboard struct {
|
||||
Id string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"`
|
||||
Definition dashboardtypes.StorableDashboardData `json:"definition,omitempty"`
|
||||
}
|
||||
|
||||
// SupportedServices is the map of supported services for each cloud provider.
|
||||
var SupportedServices = map[CloudProviderType][]ServiceID{
|
||||
CloudProviderTypeAWS: {
|
||||
{valuer.NewString("alb")},
|
||||
{valuer.NewString("api-gateway")},
|
||||
{valuer.NewString("dynamodb")},
|
||||
{valuer.NewString("ec2")},
|
||||
{valuer.NewString("ecs")},
|
||||
{valuer.NewString("eks")},
|
||||
{valuer.NewString("elasticache")},
|
||||
{valuer.NewString("lambda")},
|
||||
{valuer.NewString("msk")},
|
||||
{valuer.NewString("rds")},
|
||||
{valuer.NewString("s3sync")},
|
||||
{valuer.NewString("sns")},
|
||||
{valuer.NewString("sqs")},
|
||||
},
|
||||
}
|
||||
|
||||
// NewServiceID returns a new ServiceID from a string, validated against the supported services for the given cloud provider.
|
||||
func NewServiceID(provider CloudProviderType, service string) (ServiceID, error) {
|
||||
services, ok := SupportedServices[provider]
|
||||
if !ok {
|
||||
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "no services defined for cloud provider: %s", provider)
|
||||
}
|
||||
for _, s := range services {
|
||||
if s.StringValue() == service {
|
||||
return s, nil
|
||||
}
|
||||
}
|
||||
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "invalid service id %q for cloud provider %s", service, provider)
|
||||
}
|
||||
|
||||
// UTILS
|
||||
|
||||
// GetCloudIntegrationDashboardID returns the dashboard id for a cloud integration, given the cloud provider, service id, and dashboard id.
|
||||
// This is used to generate unique dashboard ids for cloud integration, and also to parse the dashboard id to get the cloud provider and service id when needed.
|
||||
func GetCloudIntegrationDashboardID(cloudProvider CloudProviderType, svcId, dashboardId string) string {
|
||||
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcId, dashboardId)
|
||||
}
|
||||
|
||||
// GetDashboardsFromAssets returns the list of dashboards for the cloud provider service from definition.
|
||||
func GetDashboardsFromAssets(
|
||||
svcId string,
|
||||
orgID valuer.UUID,
|
||||
cloudProvider CloudProviderType,
|
||||
createdAt time.Time,
|
||||
assets Assets,
|
||||
) []*dashboardtypes.Dashboard {
|
||||
dashboards := make([]*dashboardtypes.Dashboard, 0)
|
||||
|
||||
for _, d := range assets.Dashboards {
|
||||
author := fmt.Sprintf("%s-integration", cloudProvider)
|
||||
dashboards = append(dashboards, &dashboardtypes.Dashboard{
|
||||
ID: GetCloudIntegrationDashboardID(cloudProvider, svcId, d.Id),
|
||||
Locked: true,
|
||||
OrgID: orgID,
|
||||
Data: d.Definition,
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: createdAt,
|
||||
},
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: author,
|
||||
UpdatedBy: author,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return dashboards
|
||||
}
|
||||
41
pkg/types/cloudintegrationtypes/store.go
Normal file
41
pkg/types/cloudintegrationtypes/store.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Store interface {
|
||||
// GetAccountByID returns a cloud integration account by id
|
||||
GetAccountByID(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) (*StorableCloudIntegration, error)
|
||||
|
||||
// CreateAccount creates a new cloud integration account
|
||||
CreateAccount(ctx context.Context, account *StorableCloudIntegration) (*StorableCloudIntegration, error)
|
||||
|
||||
// UpdateAccount updates an existing cloud integration account
|
||||
UpdateAccount(ctx context.Context, account *StorableCloudIntegration) error
|
||||
|
||||
// RemoveAccount marks a cloud integration account as removed by setting the RemovedAt field
|
||||
RemoveAccount(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) error
|
||||
|
||||
// ListConnectedAccounts returns all the cloud integration accounts for the org and cloud provider
|
||||
ListConnectedAccounts(ctx context.Context, orgID valuer.UUID, provider CloudProviderType) ([]*StorableCloudIntegration, error)
|
||||
|
||||
// GetConnectedAccount for a given provider
|
||||
GetConnectedAccount(ctx context.Context, orgID valuer.UUID, provider CloudProviderType, providerAccountID string) (*StorableCloudIntegration, error)
|
||||
|
||||
// cloud_integration_service related methods
|
||||
|
||||
// GetServiceByServiceID returns the cloud integration service for the given cloud integration id and service id
|
||||
GetServiceByServiceID(ctx context.Context, cloudIntegrationID valuer.UUID, serviceID ServiceID) (*StorableCloudIntegrationService, error)
|
||||
|
||||
// CreateService creates a new cloud integration service
|
||||
CreateService(ctx context.Context, service *StorableCloudIntegrationService) (*StorableCloudIntegrationService, error)
|
||||
|
||||
// UpdateService updates an existing cloud integration service
|
||||
UpdateService(ctx context.Context, service *StorableCloudIntegrationService) error
|
||||
|
||||
// ListServices returns all the cloud integration services for the given cloud integration id
|
||||
ListServices(ctx context.Context, cloudIntegrationID valuer.UUID) ([]*StorableCloudIntegrationService, error)
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/open-telemetry/opamp-go/protobufs"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -17,6 +18,15 @@ const (
|
||||
AgentStatusDisconnected
|
||||
)
|
||||
|
||||
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
|
||||
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
|
||||
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
|
||||
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
}
|
||||
|
||||
type StorableAgent struct {
|
||||
bun.BaseModel `bun:"table:agent"`
|
||||
|
||||
@@ -30,16 +40,6 @@ type StorableAgent struct {
|
||||
Config string `bun:"config,type:text,notnull"`
|
||||
}
|
||||
|
||||
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
|
||||
return StorableAgent{
|
||||
OrgID: orgID,
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
AgentID: agentID,
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
type ElementType struct{ valuer.String }
|
||||
|
||||
var (
|
||||
@@ -49,24 +49,6 @@ var (
|
||||
ElementTypeLbExporter = ElementType{valuer.NewString("lb_exporter")}
|
||||
)
|
||||
|
||||
// NewElementType creates a new ElementType from a string value.
|
||||
// Returns the corresponding ElementType constant if the string matches,
|
||||
// otherwise returns an empty ElementType.
|
||||
func NewElementType(value string) ElementType {
|
||||
switch valuer.NewString(value) {
|
||||
case ElementTypeSamplingRules.String:
|
||||
return ElementTypeSamplingRules
|
||||
case ElementTypeDropRules.String:
|
||||
return ElementTypeDropRules
|
||||
case ElementTypeLogPipelines.String:
|
||||
return ElementTypeLogPipelines
|
||||
case ElementTypeLbExporter.String:
|
||||
return ElementTypeLbExporter
|
||||
default:
|
||||
return ElementType{valuer.NewString("")}
|
||||
}
|
||||
}
|
||||
|
||||
type DeployStatus struct{ valuer.String }
|
||||
|
||||
var (
|
||||
@@ -98,6 +80,26 @@ type AgentConfigVersion struct {
|
||||
Config string `json:"config" bun:"config,type:text"`
|
||||
}
|
||||
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_element"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
|
||||
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
}
|
||||
|
||||
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
|
||||
return StorableAgent{
|
||||
OrgID: orgID,
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
AgentID: agentID,
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
func NewAgentConfigVersion(orgId valuer.UUID, userId valuer.UUID, elementType ElementType) *AgentConfigVersion {
|
||||
return &AgentConfigVersion{
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
@@ -118,12 +120,20 @@ func (a *AgentConfigVersion) IncrementVersion(lastVersion int) {
|
||||
a.Version = lastVersion + 1
|
||||
}
|
||||
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_element"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
|
||||
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
// NewElementType creates a new ElementType from a string value.
|
||||
// Returns the corresponding ElementType constant if the string matches,
|
||||
// otherwise returns an empty ElementType.
|
||||
func NewElementType(value string) ElementType {
|
||||
switch valuer.NewString(value) {
|
||||
case ElementTypeSamplingRules.String:
|
||||
return ElementTypeSamplingRules
|
||||
case ElementTypeDropRules.String:
|
||||
return ElementTypeDropRules
|
||||
case ElementTypeLogPipelines.String:
|
||||
return ElementTypeLogPipelines
|
||||
case ElementTypeLbExporter.String:
|
||||
return ElementTypeLbExporter
|
||||
default:
|
||||
return ElementType{valuer.NewString("")}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user