Compare commits

...

2 Commits

Author SHA1 Message Date
Piyush Singariya
154560086f fix: stuck in progress logs pipeline status 2026-03-03 11:56:30 +05:30
Piyush Singariya
f95a5432e3 fix: in progress status stuck in logs pipelines 2026-02-27 12:27:59 +05:30
9 changed files with 200 additions and 52 deletions

View File

@@ -114,7 +114,6 @@ func (r *Repo) GetLatestVersion(
func (r *Repo) insertConfig(
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
) error {
if c.ElementType.StringValue() == "" {
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
}
@@ -228,6 +227,55 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
return nil
}
// GetDeployStatusByHash returns the DeployStatus for the given config hash
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
var version opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&version).
ColumnExpr("deploy_status").
Where("hash = ?", configHash).
Where("org_id = ?", orgId).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return opamptypes.DeployStatusUnknown, nil
}
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
}
return version.DeployStatus, nil
}
// GetPendingDeployments returns all config versions with in_progress deploy status.
// Used on server startup to rehydrate coordinator subscribers that were lost on crash/restart.
func (r *Repo) GetPendingDeployments(ctx context.Context) ([]opamptypes.AgentConfigVersion, error) {
var versions []opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&versions).
ColumnExpr("org_id, hash").
// Only consider non-terminal deployment states (i.e. anything except failed or deployed).
Where("deploy_status NOT IN (?, ?)",
opamptypes.DeployFailed.StringValue(),
opamptypes.Deployed.StringValue(),
).
// For each org, keep only the latest pending version.
Where("version = (SELECT MAX(version) FROM agent_config_version WHERE org_id = acv.org_id AND deploy_status NOT IN (?, ?))",
opamptypes.DeployFailed.StringValue(),
opamptypes.Deployed.StringValue(),
).
// Exclude any pending version that is before a terminal (deployed/failed) version for the same org.
Where("NOT EXISTS (SELECT 1 FROM agent_config_version acv2 WHERE acv2.org_id = acv.org_id AND acv2.version > acv.version AND acv2.deploy_status IN (?, ?))",
opamptypes.Deployed.StringValue(),
opamptypes.DeployFailed.StringValue(),
).
Where("hash IS NOT NULL AND hash != ''").
Scan(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to query pending deployments")
}
return versions, nil
}
func (r *Repo) updateDeployStatusByHash(
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
) error {

View File

@@ -178,6 +178,31 @@ func (m *Manager) ReportConfigDeploymentStatus(
}
}
// Implements model.AgentConfigProvider
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
}
// Implements opamp.AgentConfigProvider
func (m *Manager) GetPendingDeployments(ctx context.Context) ([]opamp.PendingDeployment, error) {
versions, err := m.Repo.GetPendingDeployments(ctx)
if err != nil {
return nil, err
}
result := make([]opamp.PendingDeployment, 0, len(versions))
for _, v := range versions {
rawHash := strings.TrimPrefix(v.Hash, v.OrgID.String())
if rawHash == "" {
continue
}
result = append(result, opamp.PendingDeployment{
OrgID: v.OrgID,
RawConfigHash: rawHash,
})
}
return result, nil
}
func GetLatestVersion(
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
) (*opamptypes.AgentConfigVersion, error) {

View File

@@ -1,6 +1,21 @@
package opamp
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
import (
"context"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/valuer"
)
// PendingDeployment is an agent config deployment still in the in_progress state.
// These are re-registered in the coordinator on server startup so that
// notifySubscribers can find them when the agent reconnects after a crash.
type PendingDeployment struct {
OrgID valuer.UUID
// RawConfigHash is the hash without the orgId prefix, matching the
// ConfigHash sent in AgentRemoteConfig and reported back by the agent.
RawConfigHash string
}
// Interface for a source of otel collector config recommendations.
type AgentConfigProvider interface {
@@ -9,4 +24,9 @@ type AgentConfigProvider interface {
// Subscribe to be notified on changes in config provided by this source.
// Used for rolling out latest config recommendation to all connected agents when settings change
SubscribeToConfigUpdates(callback func()) (unsubscribe func())
// GetPendingDeployments returns all config deployments currently in_progress.
// Called on server startup to re-register coordinator subscribers that were
// lost when the server previously crashed or restarted.
GetPendingDeployments(ctx context.Context) ([]PendingDeployment, error)
}

View File

@@ -5,6 +5,7 @@ import (
"log"
"net"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/knadh/koanf"
@@ -127,6 +128,16 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
return exists
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
return opamptypes.DeployStatusUnknown, nil
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) GetPendingDeployments(_ context.Context) ([]PendingDeployment, error) {
return nil, nil
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
subscriberId := uuid.NewString()

View File

@@ -112,53 +112,71 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
return false
}
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
prevStatus := agent.Status
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) (agentDescrChanged bool) {
if agent.Status == nil {
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
} else {
// Not a new Agent. Update the Status.
agent.Status.SequenceNum = newStatus.SequenceNum
// Check what's changed in the AgentDescription.
if newStatus.AgentDescription != nil {
// If the AgentDescription field is set it means the Agent tells us
// something is changed in the field since the last status report
// (or this is the first report).
// Make full comparison of previous and new descriptions to see if it
// really is different.
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
// Agent description didn't change.
agentDescrChanged = false
} else {
// Yes, the description is different, update it.
agent.Status.AgentDescription = newStatus.AgentDescription
agentDescrChanged = true
}
} else {
// AgentDescription field is not set, which means description didn't change.
agentDescrChanged = false
// initialize the remote config status to unset
agent.Status = &protobufs.AgentToServer{
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
},
}
// Update remote config status if it is included and is different from what we have.
if newStatus.RemoteConfigStatus != nil &&
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
dbHash := agent.OrgID.String() + rawHash
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, dbHash)
if err == nil {
// Set the agent config status to the status from the database
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
}
// todo: need to address multiple agent scenario here
// for now, the first response will be sent back to the UI
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
}
// First message from this agent instance (new connect or server restart).
// If the agent brings a RemoteConfigStatus, consult the DB to decide
// whether this resolves a pending deployment. This is the authoritative
// answer: if DB says in_progress and agent says APPLIED/FAILED, we notify.
// No mock status, no guessing — the DB IS the source of truth.
if newStatus.RemoteConfigStatus != nil {
// Agent just started, i.e. it doesn't have a remote config status yet.
if newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
agentDescrChanged = true
agent.Status.RemoteConfigStatus.Status = protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET
} else {
// else Agent was already running, Server just reconnected.
agent.Status.AgentDescription = newStatus.AgentDescription
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
// database has already recorded the final status of the deployment, So here we don't need to prepare status for the agent
// Instead we directly Copy it from newStatus
switch agent.Status.RemoteConfigStatus.Status {
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
agent.Status.RemoteConfigStatus.Status = newStatus.RemoteConfigStatus.Status
}
}
}
}
// Subsequent message — update sequence number and diff fields.
agent.Status.SequenceNum = newStatus.SequenceNum
if newStatus.AgentDescription != nil {
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
agentDescrChanged = false
} else {
agent.Status.AgentDescription = newStatus.AgentDescription
agentDescrChanged = true
}
}
// Notify subscribers when RemoteConfigStatus changes.
if newStatus.RemoteConfigStatus != nil &&
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
hash := string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash)
switch agent.Status.RemoteConfigStatus.Status {
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
onConfigFailure(agent.OrgID, agent.AgentID, hash, agent.Status.RemoteConfigStatus.ErrorMessage)
}
}
if agentDescrChanged {
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
@@ -186,14 +204,8 @@ func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer)
}
}
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
if agent.Status == nil {
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
}
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) (agentDescrChanged bool) {
agentDescrChanged = agent.updateAgentDescription(newStatus, configProvider)
agent.updateRemoteConfigStatus(newStatus)
agent.updateHealth(newStatus)
return agentDescrChanged
@@ -238,7 +250,7 @@ func (agent *Agent) processStatusUpdate(
// current status is not up-to-date.
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
agentDescrChanged := agent.updateStatusField(newStatus)
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
// Check if any fields were omitted in the status report.
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&

View File

@@ -1,6 +1,11 @@
package model
import "github.com/SigNoz/signoz/pkg/valuer"
import (
"context"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Interface for source of otel collector config recommendations.
type AgentConfigProvider interface {
@@ -20,4 +25,10 @@ type AgentConfigProvider interface {
configId string,
err error,
)
// GetDeployStatusByHash returns the DeployStatus for the given config hash
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
// no matching row exists. Used by the agent's first-connect handler to
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
}

View File

@@ -41,7 +41,6 @@ func onConfigFailure(orgId valuer.UUID, agentId string, hash string, errorMessag
notifySubscribers(orgId, agentId, key, errors.New(errorMessage))
}
// OnSuccess listens to config changes and notifies subscribers
func notifySubscribers(orgId valuer.UUID, agentId string, key string, err error) {
// this method currently does not handle multi-agent scenario.
// as soon as a message is delivered, we release all the subscribers
@@ -66,6 +65,7 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
defer coordinator.mutex.Unlock()
key := getSubscriberKey(orgId, hash)
if subs, ok := coordinator.subscribers[key]; ok {
subs = append(subs, ss)
coordinator.subscribers[key] = subs

View File

@@ -66,6 +66,17 @@ func (srv *Server) Start(listener string) error {
ListenEndpoint: listener,
}
// Re-register coordinator subscribers for any deployments that were in_progress
// when the server last shut down or crashed. Without this, notifySubscribers
// would find an empty map and the deployment status would stay stuck in_progress.
if pending, err := srv.agentConfigProvider.GetPendingDeployments(context.Background()); err != nil {
return err
} else {
for _, dep := range pending {
model.ListenToConfigUpdate(dep.OrgID, "", dep.RawConfigHash, srv.agentConfigProvider.ReportConfigDeploymentStatus)
}
}
// This will have to send request to all the agents of all tenants
unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() {
err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider)

View File

@@ -6,6 +6,7 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/uptrace/bun"
)
@@ -78,6 +79,15 @@ var (
DeployStatusUnknown = DeployStatus{valuer.NewString("unknown")}
)
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
}
type AgentConfigVersion struct {
bun.BaseModel `bun:"table:agent_config_version,alias:acv"`