mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-18 18:52:15 +00:00
Compare commits
45 Commits
fix/order-
...
refactor/c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6caa4f2c7 | ||
|
|
f86371566d | ||
|
|
9115803084 | ||
|
|
0c14d8f966 | ||
|
|
7afb461af8 | ||
|
|
a21fbb4ee0 | ||
|
|
0369842f3d | ||
|
|
59cd96562a | ||
|
|
cc4475cab7 | ||
|
|
ac8c648420 | ||
|
|
bede6be4b8 | ||
|
|
dd3d60e6df | ||
|
|
538ab686d2 | ||
|
|
936a325cb9 | ||
|
|
c6cdcd0143 | ||
|
|
cd9211d718 | ||
|
|
0601c28782 | ||
|
|
580610dbfa | ||
|
|
2d2aa02a81 | ||
|
|
dd9723ad13 | ||
|
|
3651469416 | ||
|
|
febce75734 | ||
|
|
e1616f3487 | ||
|
|
4b94287ac7 | ||
|
|
1575c7c54c | ||
|
|
8def3f835b | ||
|
|
11ed15f4c5 | ||
|
|
f47877cca9 | ||
|
|
bb2b9215ba | ||
|
|
3111904223 | ||
|
|
003e2c30d8 | ||
|
|
00fe516d10 | ||
|
|
0305f4f7db | ||
|
|
c60019a6dc | ||
|
|
acde2a37fa | ||
|
|
945241a52a | ||
|
|
e967f80c86 | ||
|
|
a09dc325de | ||
|
|
379b4f7fc4 | ||
|
|
5e536ae077 | ||
|
|
234585e642 | ||
|
|
2cc14f1ad4 | ||
|
|
dc4ed4d239 | ||
|
|
7281c36873 | ||
|
|
40288776e8 |
134
pkg/modules/cloudintegration/implcloudintegration/store.go
Normal file
134
pkg/modules/cloudintegration/implcloudintegration/store.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package implcloudintegration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/cloudintegrationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type store struct {
|
||||
store sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewStore(sqlStore sqlstore.SQLStore) cloudintegrationtypes.Store {
|
||||
return &store{store: sqlStore}
|
||||
}
|
||||
|
||||
func (s *store) GetAccountByID(ctx context.Context, orgID, id valuer.UUID, provider cloudintegrationtypes.CloudProviderType) (*cloudintegrationtypes.StorableCloudIntegration, error) {
|
||||
account := new(cloudintegrationtypes.StorableCloudIntegration)
|
||||
err := s.store.BunDBCtx(ctx).NewSelect().Model(account).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("provider = ?", provider).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, s.store.WrapNotFoundErrf(err, cloudintegrationtypes.ErrCodeCloudIntegrationNotFound, "cloud integration account with id %s not found", id)
|
||||
}
|
||||
return account, nil
|
||||
}
|
||||
|
||||
func (s *store) ListConnectedAccounts(ctx context.Context, orgID valuer.UUID, provider cloudintegrationtypes.CloudProviderType) ([]*cloudintegrationtypes.StorableCloudIntegration, error) {
|
||||
var accounts []*cloudintegrationtypes.StorableCloudIntegration
|
||||
err := s.store.BunDBCtx(ctx).NewSelect().Model(&accounts).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("provider = ?", provider).
|
||||
Where("removed_at IS NULL").
|
||||
Where("account_id IS NOT NULL").
|
||||
Where("last_agent_report IS NOT NULL").
|
||||
Order("created_at ASC").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return accounts, nil
|
||||
}
|
||||
|
||||
func (s *store) CreateAccount(ctx context.Context, account *cloudintegrationtypes.StorableCloudIntegration) (*cloudintegrationtypes.StorableCloudIntegration, error) {
|
||||
_, err := s.store.BunDBCtx(ctx).NewInsert().Model(account).Exec(ctx)
|
||||
if err != nil {
|
||||
return nil, s.store.WrapAlreadyExistsErrf(err, cloudintegrationtypes.ErrCodeCloudIntegrationAlreadyExists, "cloud integration account with id %s already exists", account.ID)
|
||||
}
|
||||
|
||||
return account, nil
|
||||
}
|
||||
|
||||
func (s *store) UpdateAccount(ctx context.Context, account *cloudintegrationtypes.StorableCloudIntegration) error {
|
||||
_, err := s.store.BunDBCtx(ctx).
|
||||
NewUpdate().
|
||||
Model(account).
|
||||
WherePK().
|
||||
Where("org_id = ?", account.OrgID).
|
||||
Where("provider = ?", account.Provider).
|
||||
Exec(ctx)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) RemoveAccount(ctx context.Context, orgID, id valuer.UUID, provider cloudintegrationtypes.CloudProviderType) error {
|
||||
_, err := s.store.BunDBCtx(ctx).NewUpdate().Model(new(cloudintegrationtypes.StorableCloudIntegration)).
|
||||
Set("removed_at = ?", time.Now()).
|
||||
Where("id = ?", id).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("provider = ?", provider).
|
||||
Exec(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) GetConnectedAccount(ctx context.Context, orgID valuer.UUID, provider cloudintegrationtypes.CloudProviderType, providerAccountID string) (*cloudintegrationtypes.StorableCloudIntegration, error) {
|
||||
account := new(cloudintegrationtypes.StorableCloudIntegration)
|
||||
err := s.store.BunDBCtx(ctx).NewSelect().Model(account).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("provider = ?", provider).
|
||||
Where("account_id = ?", providerAccountID).
|
||||
Where("last_agent_report IS NOT NULL").
|
||||
Where("removed_at IS NULL").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, s.store.WrapNotFoundErrf(err, cloudintegrationtypes.ErrCodeCloudIntegrationNotFound, "connected account with provider account id %s not found", providerAccountID)
|
||||
}
|
||||
return account, nil
|
||||
}
|
||||
|
||||
func (s *store) GetServiceByServiceID(ctx context.Context, cloudIntegrationID valuer.UUID, serviceID cloudintegrationtypes.ServiceID) (*cloudintegrationtypes.StorableCloudIntegrationService, error) {
|
||||
service := new(cloudintegrationtypes.StorableCloudIntegrationService)
|
||||
err := s.store.BunDBCtx(ctx).NewSelect().Model(service).
|
||||
Where("cloud_integration_id = ?", cloudIntegrationID).
|
||||
Where("type = ?", serviceID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, s.store.WrapNotFoundErrf(err, cloudintegrationtypes.ErrCodeCloudIntegrationNotFound, "cloud integration service with id %s not found", serviceID)
|
||||
}
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func (s *store) ListServices(ctx context.Context, cloudIntegrationID valuer.UUID) ([]*cloudintegrationtypes.StorableCloudIntegrationService, error) {
|
||||
var services []*cloudintegrationtypes.StorableCloudIntegrationService
|
||||
err := s.store.BunDBCtx(ctx).NewSelect().Model(&services).
|
||||
Where("cloud_integration_id = ?", cloudIntegrationID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (s *store) CreateService(ctx context.Context, service *cloudintegrationtypes.StorableCloudIntegrationService) (*cloudintegrationtypes.StorableCloudIntegrationService, error) {
|
||||
_, err := s.store.BunDBCtx(ctx).NewInsert().Model(service).Exec(ctx)
|
||||
if err != nil {
|
||||
return nil, s.store.WrapAlreadyExistsErrf(err, cloudintegrationtypes.ErrCodeCloudIntegrationServiceAlreadyExists, "cloud integration service with id %s already exists for integration account", service.Type)
|
||||
}
|
||||
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func (s *store) UpdateService(ctx context.Context, service *cloudintegrationtypes.StorableCloudIntegrationService) error {
|
||||
_, err := s.store.BunDBCtx(ctx).NewUpdate().Model(service).
|
||||
WherePK().
|
||||
Where("cloud_integration_id = ?", service.CloudIntegrationID).
|
||||
Where("type = ?", service.Type).
|
||||
Exec(ctx)
|
||||
return err
|
||||
}
|
||||
@@ -115,6 +115,7 @@ func (r *Repo) GetLatestVersion(
|
||||
func (r *Repo) insertConfig(
|
||||
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
|
||||
) error {
|
||||
|
||||
if c.ElementType.StringValue() == "" {
|
||||
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
|
||||
}
|
||||
@@ -228,25 +229,6 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
|
||||
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
var version opamptypes.AgentConfigVersion
|
||||
err := r.store.BunDB().NewSelect().
|
||||
Model(&version).
|
||||
ColumnExpr("deploy_status").
|
||||
Where("hash = ?", configHash).
|
||||
Where("org_id = ?", orgId).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
|
||||
}
|
||||
return version.DeployStatus, nil
|
||||
}
|
||||
|
||||
func (r *Repo) updateDeployStatusByHash(
|
||||
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
|
||||
) error {
|
||||
|
||||
@@ -180,12 +180,6 @@ func (m *Manager) ReportConfigDeploymentStatus(
|
||||
}
|
||||
}
|
||||
|
||||
// Implements model.AgentConfigProvider
|
||||
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
|
||||
}
|
||||
|
||||
|
||||
func GetLatestVersion(
|
||||
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
|
||||
) (*opamptypes.AgentConfigVersion, error) {
|
||||
|
||||
@@ -131,32 +131,38 @@ func (ic *LogParsingPipelineController) ValidatePipelines(ctx context.Context,
|
||||
return err
|
||||
}
|
||||
|
||||
func (ic *LogParsingPipelineController) getNormalizePipeline() pipelinetypes.GettablePipeline {
|
||||
return pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
||||
Name: "Default Pipeline - PreProcessing Body",
|
||||
Alias: "NormalizeBodyDefault",
|
||||
Enabled: true,
|
||||
},
|
||||
Filter: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body",
|
||||
func (ic *LogParsingPipelineController) getDefaultPipelines() ([]pipelinetypes.GettablePipeline, error) {
|
||||
defaultPipelines := []pipelinetypes.GettablePipeline{}
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
preprocessingPipeline := pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
||||
Name: "Default Pipeline - PreProcessing Body",
|
||||
Alias: "NormalizeBodyDefault",
|
||||
Enabled: true,
|
||||
},
|
||||
Filter: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body",
|
||||
},
|
||||
Operator: v3.FilterOperatorExists,
|
||||
},
|
||||
Operator: v3.FilterOperatorExists,
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []pipelinetypes.PipelineOperator{
|
||||
{
|
||||
ID: uuid.NewString(),
|
||||
Type: "normalize",
|
||||
Enabled: true,
|
||||
If: "body != nil",
|
||||
Config: []pipelinetypes.PipelineOperator{
|
||||
{
|
||||
ID: uuid.NewString(),
|
||||
Type: "normalize",
|
||||
Enabled: true,
|
||||
If: "body != nil",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
defaultPipelines = append(defaultPipelines, preprocessingPipeline)
|
||||
}
|
||||
return defaultPipelines, nil
|
||||
}
|
||||
|
||||
// Returns effective list of pipelines including user created
|
||||
@@ -289,10 +295,12 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
// add default normalize pipeline at the beginning
|
||||
pipelinesResp.Pipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, pipelinesResp.Pipelines...)
|
||||
// recommend default pipelines along with user created pipelines
|
||||
defaultPipelines, err := pc.getDefaultPipelines()
|
||||
if err != nil {
|
||||
return nil, "", model.InternalError(fmt.Errorf("failed to get default pipelines: %w", err))
|
||||
}
|
||||
pipelinesResp.Pipelines = append(pipelinesResp.Pipelines, defaultPipelines...)
|
||||
|
||||
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, pipelinesResp.Pipelines)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package opamp
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
)
|
||||
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
|
||||
// Interface for a source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/google/uuid"
|
||||
"github.com/knadh/koanf"
|
||||
@@ -128,11 +127,6 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
|
||||
return exists
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
|
||||
subscriberId := uuid.NewString()
|
||||
|
||||
@@ -111,99 +111,90 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// agentDescriptionChanged returns true when the agent sends updated properties
|
||||
// (e.g. capability flag, version) mid-connection, signalling the server to
|
||||
// recompute and push a new RemoteConfig.
|
||||
//
|
||||
// On reconnect this always returns false: handleFirstStatus pre-copies
|
||||
// AgentDescription into agent.Status so no diff is detected, avoiding a
|
||||
// redundant config recompute.
|
||||
func (agent *Agent) agentDescriptionChanged(newStatus *protobufs.AgentToServer) bool {
|
||||
// nil AgentDescription means no change per OpAMP protocol.
|
||||
if newStatus.AgentDescription == nil {
|
||||
return false
|
||||
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
prevStatus := agent.Status
|
||||
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
} else {
|
||||
// Not a new Agent. Update the Status.
|
||||
agent.Status.SequenceNum = newStatus.SequenceNum
|
||||
|
||||
// Check what's changed in the AgentDescription.
|
||||
if newStatus.AgentDescription != nil {
|
||||
// If the AgentDescription field is set it means the Agent tells us
|
||||
// something is changed in the field since the last status report
|
||||
// (or this is the first report).
|
||||
// Make full comparison of previous and new descriptions to see if it
|
||||
// really is different.
|
||||
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
|
||||
// Agent description didn't change.
|
||||
agentDescrChanged = false
|
||||
} else {
|
||||
// Yes, the description is different, update it.
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
agentDescrChanged = true
|
||||
}
|
||||
} else {
|
||||
// AgentDescription field is not set, which means description didn't change.
|
||||
agentDescrChanged = false
|
||||
}
|
||||
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil &&
|
||||
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
|
||||
}
|
||||
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
|
||||
return false
|
||||
|
||||
if agentDescrChanged {
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
}
|
||||
|
||||
return agentDescrChanged
|
||||
}
|
||||
|
||||
func (agent *Agent) updateHealth(newStatus *protobufs.AgentToServer) {
|
||||
if newStatus.Health == nil {
|
||||
return
|
||||
}
|
||||
|
||||
agent.Status.Health = newStatus.Health
|
||||
|
||||
if agent.Status != nil && agent.Status.Health != nil && agent.Status.Health.Healthy {
|
||||
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC()
|
||||
}
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
return true
|
||||
}
|
||||
|
||||
// updateRemoteConfigStatus updates the stored RemoteConfigStatus and notifies
|
||||
// subscribers if the status has changed relative to what we have stored.
|
||||
func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer) {
|
||||
if newStatus.RemoteConfigStatus == nil ||
|
||||
proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
return
|
||||
}
|
||||
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
hash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
switch newStatus.RemoteConfigStatus.Status {
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, hash, newStatus.RemoteConfigStatus.ErrorMessage)
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
}
|
||||
}
|
||||
|
||||
// handleFirstStatus initializes agent.Status on the first message received from
|
||||
// this agent instance. It is a no-op for all subsequent messages.
|
||||
func (agent *Agent) handleFirstStatus(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) {
|
||||
if agent.Status != nil {
|
||||
return
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
}
|
||||
|
||||
// Initialize with a clean slate.
|
||||
agent.Status = &protobufs.AgentToServer{
|
||||
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
|
||||
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
},
|
||||
}
|
||||
|
||||
if newStatus.RemoteConfigStatus == nil ||
|
||||
newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
|
||||
// Agent just started fresh — no prior deployment to reconcile with the DB.
|
||||
return
|
||||
}
|
||||
|
||||
// Since the server's connection is restarted;
|
||||
// copy the agent description; so no change is detected by agentDescriptionChanged
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
|
||||
// Server reconnected while the agent was already running.
|
||||
// Reconcile deployment status with DB; DB is the source of truth.
|
||||
// If DB says in_progress but agent now reports APPLIED/FAILED,
|
||||
// updateRemoteConfigStatus will detect the transition and notify subscribers.
|
||||
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, agent.OrgID.String()+rawHash)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
|
||||
|
||||
// If the deployment is still in-flight, rehydrate the subscriber so that
|
||||
// updateRemoteConfigStatus can fire onConfigSuccess/onConfigFailure when
|
||||
// the agent next reports a terminal status.
|
||||
if deployStatus != opamptypes.Deployed && deployStatus != opamptypes.DeployFailed {
|
||||
ListenToConfigUpdate(agent.OrgID, agent.AgentID, rawHash, configProvider.ReportConfigDeploymentStatus)
|
||||
}
|
||||
}
|
||||
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) bool {
|
||||
agent.handleFirstStatus(newStatus, configProvider)
|
||||
agentDescrChanged := agent.agentDescriptionChanged(newStatus)
|
||||
// record healthy timestamp
|
||||
if newStatus.Health != nil && newStatus.Health.Healthy {
|
||||
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(newStatus.Health.StartTimeUnixNano)).UTC()
|
||||
}
|
||||
// notify subscribers first; this will update the status in the DB
|
||||
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
|
||||
agent.updateRemoteConfigStatus(newStatus)
|
||||
// update local reference in last.
|
||||
agent.Status = newStatus
|
||||
agent.updateHealth(newStatus)
|
||||
return agentDescrChanged
|
||||
}
|
||||
|
||||
@@ -246,7 +237,7 @@ func (agent *Agent) processStatusUpdate(
|
||||
// current status is not up-to-date.
|
||||
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
|
||||
|
||||
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
|
||||
agentDescrChanged := agent.updateStatusField(newStatus)
|
||||
|
||||
// Check if any fields were omitted in the status report.
|
||||
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
import "github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
// Interface for source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
@@ -25,10 +20,4 @@ type AgentConfigProvider interface {
|
||||
configId string,
|
||||
err error,
|
||||
)
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
|
||||
// no matching row exists. Used by the agent's first-connect handler to
|
||||
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
|
||||
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
|
||||
}
|
||||
|
||||
@@ -66,7 +66,6 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
|
||||
defer coordinator.mutex.Unlock()
|
||||
|
||||
key := getSubscriberKey(orgId, hash)
|
||||
|
||||
if subs, ok := coordinator.subscribers[key]; ok {
|
||||
subs = append(subs, ss)
|
||||
coordinator.subscribers[key] = subs
|
||||
|
||||
@@ -13,7 +13,9 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeCloudIntegrationNotFound = errors.MustNewCode("cloud_integration_not_found")
|
||||
ErrCodeCloudIntegrationNotFound = errors.MustNewCode("cloud_integration_not_found")
|
||||
ErrCodeCloudIntegrationAlreadyExists = errors.MustNewCode("cloud_integration_already_exists")
|
||||
ErrCodeCloudIntegrationServiceAlreadyExists = errors.MustNewCode("cloud_integration_service_already_exists")
|
||||
)
|
||||
|
||||
// StorableCloudIntegration represents a cloud integration stored in the database.
|
||||
|
||||
@@ -10,6 +10,12 @@ type Store interface {
|
||||
// GetAccountByID returns a cloud integration account by id
|
||||
GetAccountByID(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) (*StorableCloudIntegration, error)
|
||||
|
||||
// GetConnectedAccount for a given provider
|
||||
GetConnectedAccount(ctx context.Context, orgID valuer.UUID, provider CloudProviderType, providerAccountID string) (*StorableCloudIntegration, error)
|
||||
|
||||
// ListConnectedAccounts returns all the cloud integration accounts for the org and cloud provider
|
||||
ListConnectedAccounts(ctx context.Context, orgID valuer.UUID, provider CloudProviderType) ([]*StorableCloudIntegration, error)
|
||||
|
||||
// CreateAccount creates a new cloud integration account
|
||||
CreateAccount(ctx context.Context, account *StorableCloudIntegration) (*StorableCloudIntegration, error)
|
||||
|
||||
@@ -19,23 +25,17 @@ type Store interface {
|
||||
// RemoveAccount marks a cloud integration account as removed by setting the RemovedAt field
|
||||
RemoveAccount(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) error
|
||||
|
||||
// ListConnectedAccounts returns all the cloud integration accounts for the org and cloud provider
|
||||
ListConnectedAccounts(ctx context.Context, orgID valuer.UUID, provider CloudProviderType) ([]*StorableCloudIntegration, error)
|
||||
|
||||
// GetConnectedAccount for a given provider
|
||||
GetConnectedAccount(ctx context.Context, orgID valuer.UUID, provider CloudProviderType, providerAccountID string) (*StorableCloudIntegration, error)
|
||||
|
||||
// cloud_integration_service related methods
|
||||
|
||||
// GetServiceByServiceID returns the cloud integration service for the given cloud integration id and service id
|
||||
GetServiceByServiceID(ctx context.Context, cloudIntegrationID valuer.UUID, serviceID ServiceID) (*StorableCloudIntegrationService, error)
|
||||
|
||||
// ListServices returns all the cloud integration services for the given cloud integration id
|
||||
ListServices(ctx context.Context, cloudIntegrationID valuer.UUID) ([]*StorableCloudIntegrationService, error)
|
||||
|
||||
// CreateService creates a new cloud integration service
|
||||
CreateService(ctx context.Context, service *StorableCloudIntegrationService) (*StorableCloudIntegrationService, error)
|
||||
|
||||
// UpdateService updates an existing cloud integration service
|
||||
UpdateService(ctx context.Context, service *StorableCloudIntegrationService) error
|
||||
|
||||
// ListServices returns all the cloud integration services for the given cloud integration id
|
||||
ListServices(ctx context.Context, cloudIntegrationID valuer.UUID) ([]*StorableCloudIntegrationService, error)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/open-telemetry/opamp-go/protobufs"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -18,15 +17,6 @@ const (
|
||||
AgentStatusDisconnected
|
||||
)
|
||||
|
||||
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
|
||||
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
|
||||
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
|
||||
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
}
|
||||
|
||||
type StorableAgent struct {
|
||||
bun.BaseModel `bun:"table:agent"`
|
||||
|
||||
@@ -40,6 +30,16 @@ type StorableAgent struct {
|
||||
Config string `bun:"config,type:text,notnull"`
|
||||
}
|
||||
|
||||
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
|
||||
return StorableAgent{
|
||||
OrgID: orgID,
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
AgentID: agentID,
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
type ElementType struct{ valuer.String }
|
||||
|
||||
var (
|
||||
@@ -49,6 +49,24 @@ var (
|
||||
ElementTypeLbExporter = ElementType{valuer.NewString("lb_exporter")}
|
||||
)
|
||||
|
||||
// NewElementType creates a new ElementType from a string value.
|
||||
// Returns the corresponding ElementType constant if the string matches,
|
||||
// otherwise returns an empty ElementType.
|
||||
func NewElementType(value string) ElementType {
|
||||
switch valuer.NewString(value) {
|
||||
case ElementTypeSamplingRules.String:
|
||||
return ElementTypeSamplingRules
|
||||
case ElementTypeDropRules.String:
|
||||
return ElementTypeDropRules
|
||||
case ElementTypeLogPipelines.String:
|
||||
return ElementTypeLogPipelines
|
||||
case ElementTypeLbExporter.String:
|
||||
return ElementTypeLbExporter
|
||||
default:
|
||||
return ElementType{valuer.NewString("")}
|
||||
}
|
||||
}
|
||||
|
||||
type DeployStatus struct{ valuer.String }
|
||||
|
||||
var (
|
||||
@@ -80,26 +98,6 @@ type AgentConfigVersion struct {
|
||||
Config string `json:"config" bun:"config,type:text"`
|
||||
}
|
||||
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_element"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
|
||||
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
}
|
||||
|
||||
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
|
||||
return StorableAgent{
|
||||
OrgID: orgID,
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
AgentID: agentID,
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
func NewAgentConfigVersion(orgId valuer.UUID, userId valuer.UUID, elementType ElementType) *AgentConfigVersion {
|
||||
return &AgentConfigVersion{
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
@@ -120,20 +118,12 @@ func (a *AgentConfigVersion) IncrementVersion(lastVersion int) {
|
||||
a.Version = lastVersion + 1
|
||||
}
|
||||
|
||||
// NewElementType creates a new ElementType from a string value.
|
||||
// Returns the corresponding ElementType constant if the string matches,
|
||||
// otherwise returns an empty ElementType.
|
||||
func NewElementType(value string) ElementType {
|
||||
switch valuer.NewString(value) {
|
||||
case ElementTypeSamplingRules.String:
|
||||
return ElementTypeSamplingRules
|
||||
case ElementTypeDropRules.String:
|
||||
return ElementTypeDropRules
|
||||
case ElementTypeLogPipelines.String:
|
||||
return ElementTypeLogPipelines
|
||||
case ElementTypeLbExporter.String:
|
||||
return ElementTypeLbExporter
|
||||
default:
|
||||
return ElementType{valuer.NewString("")}
|
||||
}
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_element"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
|
||||
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user