mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-03 12:32:02 +00:00
Compare commits
2 Commits
light-mode
...
in-progres
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
154560086f | ||
|
|
f95a5432e3 |
@@ -114,7 +114,6 @@ func (r *Repo) GetLatestVersion(
|
||||
func (r *Repo) insertConfig(
|
||||
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
|
||||
) error {
|
||||
|
||||
if c.ElementType.StringValue() == "" {
|
||||
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
|
||||
}
|
||||
@@ -228,6 +227,55 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
|
||||
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
var version opamptypes.AgentConfigVersion
|
||||
err := r.store.BunDB().NewSelect().
|
||||
Model(&version).
|
||||
ColumnExpr("deploy_status").
|
||||
Where("hash = ?", configHash).
|
||||
Where("org_id = ?", orgId).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
|
||||
}
|
||||
return version.DeployStatus, nil
|
||||
}
|
||||
|
||||
// GetPendingDeployments returns all config versions with in_progress deploy status.
|
||||
// Used on server startup to rehydrate coordinator subscribers that were lost on crash/restart.
|
||||
func (r *Repo) GetPendingDeployments(ctx context.Context) ([]opamptypes.AgentConfigVersion, error) {
|
||||
var versions []opamptypes.AgentConfigVersion
|
||||
err := r.store.BunDB().NewSelect().
|
||||
Model(&versions).
|
||||
ColumnExpr("org_id, hash").
|
||||
// Only consider non-terminal deployment states (i.e. anything except failed or deployed).
|
||||
Where("deploy_status NOT IN (?, ?)",
|
||||
opamptypes.DeployFailed.StringValue(),
|
||||
opamptypes.Deployed.StringValue(),
|
||||
).
|
||||
// For each org, keep only the latest pending version.
|
||||
Where("version = (SELECT MAX(version) FROM agent_config_version WHERE org_id = acv.org_id AND deploy_status NOT IN (?, ?))",
|
||||
opamptypes.DeployFailed.StringValue(),
|
||||
opamptypes.Deployed.StringValue(),
|
||||
).
|
||||
// Exclude any pending version that is before a terminal (deployed/failed) version for the same org.
|
||||
Where("NOT EXISTS (SELECT 1 FROM agent_config_version acv2 WHERE acv2.org_id = acv.org_id AND acv2.version > acv.version AND acv2.deploy_status IN (?, ?))",
|
||||
opamptypes.Deployed.StringValue(),
|
||||
opamptypes.DeployFailed.StringValue(),
|
||||
).
|
||||
Where("hash IS NOT NULL AND hash != ''").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to query pending deployments")
|
||||
}
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
func (r *Repo) updateDeployStatusByHash(
|
||||
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
|
||||
) error {
|
||||
|
||||
@@ -178,6 +178,31 @@ func (m *Manager) ReportConfigDeploymentStatus(
|
||||
}
|
||||
}
|
||||
|
||||
// Implements model.AgentConfigProvider
|
||||
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
|
||||
}
|
||||
|
||||
// Implements opamp.AgentConfigProvider
|
||||
func (m *Manager) GetPendingDeployments(ctx context.Context) ([]opamp.PendingDeployment, error) {
|
||||
versions, err := m.Repo.GetPendingDeployments(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make([]opamp.PendingDeployment, 0, len(versions))
|
||||
for _, v := range versions {
|
||||
rawHash := strings.TrimPrefix(v.Hash, v.OrgID.String())
|
||||
if rawHash == "" {
|
||||
continue
|
||||
}
|
||||
result = append(result, opamp.PendingDeployment{
|
||||
OrgID: v.OrgID,
|
||||
RawConfigHash: rawHash,
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func GetLatestVersion(
|
||||
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
|
||||
) (*opamptypes.AgentConfigVersion, error) {
|
||||
|
||||
@@ -1,6 +1,21 @@
|
||||
package opamp
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// PendingDeployment is an agent config deployment still in the in_progress state.
|
||||
// These are re-registered in the coordinator on server startup so that
|
||||
// notifySubscribers can find them when the agent reconnects after a crash.
|
||||
type PendingDeployment struct {
|
||||
OrgID valuer.UUID
|
||||
// RawConfigHash is the hash without the orgId prefix, matching the
|
||||
// ConfigHash sent in AgentRemoteConfig and reported back by the agent.
|
||||
RawConfigHash string
|
||||
}
|
||||
|
||||
// Interface for a source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
@@ -9,4 +24,9 @@ type AgentConfigProvider interface {
|
||||
// Subscribe to be notified on changes in config provided by this source.
|
||||
// Used for rolling out latest config recommendation to all connected agents when settings change
|
||||
SubscribeToConfigUpdates(callback func()) (unsubscribe func())
|
||||
|
||||
// GetPendingDeployments returns all config deployments currently in_progress.
|
||||
// Called on server startup to re-register coordinator subscribers that were
|
||||
// lost when the server previously crashed or restarted.
|
||||
GetPendingDeployments(ctx context.Context) ([]PendingDeployment, error)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/google/uuid"
|
||||
"github.com/knadh/koanf"
|
||||
@@ -127,6 +128,16 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
|
||||
return exists
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) GetPendingDeployments(_ context.Context) ([]PendingDeployment, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
|
||||
subscriberId := uuid.NewString()
|
||||
|
||||
@@ -112,53 +112,71 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
prevStatus := agent.Status
|
||||
|
||||
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) (agentDescrChanged bool) {
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
} else {
|
||||
// Not a new Agent. Update the Status.
|
||||
agent.Status.SequenceNum = newStatus.SequenceNum
|
||||
|
||||
// Check what's changed in the AgentDescription.
|
||||
if newStatus.AgentDescription != nil {
|
||||
// If the AgentDescription field is set it means the Agent tells us
|
||||
// something is changed in the field since the last status report
|
||||
// (or this is the first report).
|
||||
// Make full comparison of previous and new descriptions to see if it
|
||||
// really is different.
|
||||
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
|
||||
// Agent description didn't change.
|
||||
agentDescrChanged = false
|
||||
} else {
|
||||
// Yes, the description is different, update it.
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
agentDescrChanged = true
|
||||
}
|
||||
} else {
|
||||
// AgentDescription field is not set, which means description didn't change.
|
||||
agentDescrChanged = false
|
||||
// initialize the remote config status to unset
|
||||
agent.Status = &protobufs.AgentToServer{
|
||||
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
|
||||
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
},
|
||||
}
|
||||
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil &&
|
||||
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
dbHash := agent.OrgID.String() + rawHash
|
||||
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, dbHash)
|
||||
if err == nil {
|
||||
// Set the agent config status to the status from the database
|
||||
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
|
||||
}
|
||||
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
|
||||
}
|
||||
// First message from this agent instance (new connect or server restart).
|
||||
// If the agent brings a RemoteConfigStatus, consult the DB to decide
|
||||
// whether this resolves a pending deployment. This is the authoritative
|
||||
// answer: if DB says in_progress and agent says APPLIED/FAILED, we notify.
|
||||
// No mock status, no guessing — the DB IS the source of truth.
|
||||
if newStatus.RemoteConfigStatus != nil {
|
||||
// Agent just started, i.e. it doesn't have a remote config status yet.
|
||||
if newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
|
||||
agentDescrChanged = true
|
||||
agent.Status.RemoteConfigStatus.Status = protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET
|
||||
} else {
|
||||
// else Agent was already running, Server just reconnected.
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
|
||||
// database has already recorded the final status of the deployment, So here we don't need to prepare status for the agent
|
||||
// Instead we directly Copy it from newStatus
|
||||
switch agent.Status.RemoteConfigStatus.Status {
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
|
||||
protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
|
||||
agent.Status.RemoteConfigStatus.Status = newStatus.RemoteConfigStatus.Status
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Subsequent message — update sequence number and diff fields.
|
||||
agent.Status.SequenceNum = newStatus.SequenceNum
|
||||
|
||||
if newStatus.AgentDescription != nil {
|
||||
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
|
||||
agentDescrChanged = false
|
||||
} else {
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
agentDescrChanged = true
|
||||
}
|
||||
}
|
||||
|
||||
// Notify subscribers when RemoteConfigStatus changes.
|
||||
if newStatus.RemoteConfigStatus != nil &&
|
||||
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
hash := string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
switch agent.Status.RemoteConfigStatus.Status {
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, hash, agent.Status.RemoteConfigStatus.ErrorMessage)
|
||||
}
|
||||
}
|
||||
|
||||
if agentDescrChanged {
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
@@ -186,14 +204,8 @@ func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer)
|
||||
}
|
||||
}
|
||||
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
}
|
||||
|
||||
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) (agentDescrChanged bool) {
|
||||
agentDescrChanged = agent.updateAgentDescription(newStatus, configProvider)
|
||||
agent.updateRemoteConfigStatus(newStatus)
|
||||
agent.updateHealth(newStatus)
|
||||
return agentDescrChanged
|
||||
@@ -238,7 +250,7 @@ func (agent *Agent) processStatusUpdate(
|
||||
// current status is not up-to-date.
|
||||
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
|
||||
|
||||
agentDescrChanged := agent.updateStatusField(newStatus)
|
||||
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
|
||||
|
||||
// Check if any fields were omitted in the status report.
|
||||
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
package model
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/valuer"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Interface for source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
@@ -20,4 +25,10 @@ type AgentConfigProvider interface {
|
||||
configId string,
|
||||
err error,
|
||||
)
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
|
||||
// no matching row exists. Used by the agent's first-connect handler to
|
||||
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
|
||||
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
|
||||
}
|
||||
|
||||
@@ -41,7 +41,6 @@ func onConfigFailure(orgId valuer.UUID, agentId string, hash string, errorMessag
|
||||
notifySubscribers(orgId, agentId, key, errors.New(errorMessage))
|
||||
}
|
||||
|
||||
// OnSuccess listens to config changes and notifies subscribers
|
||||
func notifySubscribers(orgId valuer.UUID, agentId string, key string, err error) {
|
||||
// this method currently does not handle multi-agent scenario.
|
||||
// as soon as a message is delivered, we release all the subscribers
|
||||
@@ -66,6 +65,7 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
|
||||
defer coordinator.mutex.Unlock()
|
||||
|
||||
key := getSubscriberKey(orgId, hash)
|
||||
|
||||
if subs, ok := coordinator.subscribers[key]; ok {
|
||||
subs = append(subs, ss)
|
||||
coordinator.subscribers[key] = subs
|
||||
|
||||
@@ -66,6 +66,17 @@ func (srv *Server) Start(listener string) error {
|
||||
ListenEndpoint: listener,
|
||||
}
|
||||
|
||||
// Re-register coordinator subscribers for any deployments that were in_progress
|
||||
// when the server last shut down or crashed. Without this, notifySubscribers
|
||||
// would find an empty map and the deployment status would stay stuck in_progress.
|
||||
if pending, err := srv.agentConfigProvider.GetPendingDeployments(context.Background()); err != nil {
|
||||
return err
|
||||
} else {
|
||||
for _, dep := range pending {
|
||||
model.ListenToConfigUpdate(dep.OrgID, "", dep.RawConfigHash, srv.agentConfigProvider.ReportConfigDeploymentStatus)
|
||||
}
|
||||
}
|
||||
|
||||
// This will have to send request to all the agents of all tenants
|
||||
unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() {
|
||||
err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider)
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/open-telemetry/opamp-go/protobufs"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -78,6 +79,15 @@ var (
|
||||
DeployStatusUnknown = DeployStatus{valuer.NewString("unknown")}
|
||||
)
|
||||
|
||||
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
|
||||
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
|
||||
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
|
||||
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
}
|
||||
|
||||
type AgentConfigVersion struct {
|
||||
bun.BaseModel `bun:"table:agent_config_version,alias:acv"`
|
||||
|
||||
|
||||
Reference in New Issue
Block a user