Compare commits

...

5 Commits

Author SHA1 Message Date
Piyush Singariya
4c22ce2cd4 revert: comment change 2026-03-09 16:41:58 +05:30
Piyush Singariya
803fee0fdd fix: changes based on review 2026-03-09 16:36:51 +05:30
Piyush Singariya
fc321f8acc Merge branch 'main' into in-progress-agent-config 2026-03-05 11:20:43 +05:30
Piyush Singariya
154560086f fix: stuck in progress logs pipeline status 2026-03-03 11:56:30 +05:30
Piyush Singariya
f95a5432e3 fix: in progress status stuck in logs pipelines 2026-02-27 12:27:59 +05:30
8 changed files with 161 additions and 86 deletions

View File

@@ -114,7 +114,6 @@ func (r *Repo) GetLatestVersion(
func (r *Repo) insertConfig(
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
) error {
if c.ElementType.StringValue() == "" {
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
}
@@ -228,6 +227,25 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
return nil
}
// GetDeployStatusByHash returns the DeployStatus for the given config hash
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
var version opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&version).
ColumnExpr("deploy_status").
Where("hash = ?", configHash).
Where("org_id = ?", orgId).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return opamptypes.DeployStatusUnknown, nil
}
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
}
return version.DeployStatus, nil
}
func (r *Repo) updateDeployStatusByHash(
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
) error {

View File

@@ -178,6 +178,12 @@ func (m *Manager) ReportConfigDeploymentStatus(
}
}
// Implements model.AgentConfigProvider
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
}
func GetLatestVersion(
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
) (*opamptypes.AgentConfigVersion, error) {

View File

@@ -1,6 +1,8 @@
package opamp
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
import (
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
)
// Interface for a source of otel collector config recommendations.
type AgentConfigProvider interface {

View File

@@ -5,6 +5,7 @@ import (
"log"
"net"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/knadh/koanf"
@@ -127,6 +128,11 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
return exists
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
return opamptypes.DeployStatusUnknown, nil
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
subscriberId := uuid.NewString()

View File

@@ -112,53 +112,80 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
return false
}
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
prevStatus := agent.Status
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) (agentDescrChanged bool) {
if agent.Status == nil {
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
} else {
// Not a new Agent. Update the Status.
agent.Status.SequenceNum = newStatus.SequenceNum
// Check what's changed in the AgentDescription.
if newStatus.AgentDescription != nil {
// If the AgentDescription field is set it means the Agent tells us
// something is changed in the field since the last status report
// (or this is the first report).
// Make full comparison of previous and new descriptions to see if it
// really is different.
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
// Agent description didn't change.
agentDescrChanged = false
} else {
// Yes, the description is different, update it.
agent.Status.AgentDescription = newStatus.AgentDescription
agentDescrChanged = true
}
} else {
// AgentDescription field is not set, which means description didn't change.
agentDescrChanged = false
// initialize the remote config status to unset
agent.Status = &protobufs.AgentToServer{
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
},
}
// Update remote config status if it is included and is different from what we have.
if newStatus.RemoteConfigStatus != nil &&
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
dbHash := agent.OrgID.String() + rawHash
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, dbHash)
if err == nil {
// Set the agent config status to the status from the database
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
// todo: need to address multiple agent scenario here
// for now, the first response will be sent back to the UI
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
// If the deployment is still pending, register a subscriber now so that
// when the status-change check below fires onConfigSuccess/onConfigFailure,
// notifySubscribers finds the callback and updates the DB.
// we rehydrate lazily, per-agent, for exactly the hash the agent reports.
if deployStatus != opamptypes.Deployed &&
deployStatus != opamptypes.DeployFailed {
ListenToConfigUpdate(agent.OrgID, agent.AgentID, rawHash, configProvider.ReportConfigDeploymentStatus)
}
}
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
// First message from this agent instance (new connect or server restart).
// If the agent brings a RemoteConfigStatus, consult the DB to decide
// whether this resolves a pending deployment. This is the authoritative
// answer: if DB says in_progress and agent says APPLIED/FAILED, we notify.
// No mock status, no guessing — the DB IS the source of truth.
if newStatus.RemoteConfigStatus != nil {
// Agent just started, i.e. it doesn't have a remote config status yet.
if newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
agentDescrChanged = true
agent.Status.RemoteConfigStatus.Status = protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET
} else {
// else Agent was already running, Server just reconnected.
agent.Status.AgentDescription = newStatus.AgentDescription
// database has already recorded the final status of the deployment, So here we don't need to prepare status for the agent
// Instead we directly Copy it from newStatus
switch agent.Status.RemoteConfigStatus.Status {
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
agent.Status.RemoteConfigStatus.Status = newStatus.RemoteConfigStatus.Status
}
}
}
}
// Subsequent message — update sequence number and diff fields.
agent.Status.SequenceNum = newStatus.SequenceNum
if newStatus.AgentDescription != nil {
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
agentDescrChanged = false
} else {
agent.Status.AgentDescription = newStatus.AgentDescription
agentDescrChanged = true
}
}
// Notify subscribers when RemoteConfigStatus changes.
if newStatus.RemoteConfigStatus != nil &&
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
hash := string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash)
switch agent.Status.RemoteConfigStatus.Status {
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
onConfigFailure(agent.OrgID, agent.AgentID, hash, agent.Status.RemoteConfigStatus.ErrorMessage)
}
}
if agentDescrChanged {
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
@@ -186,14 +213,8 @@ func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer)
}
}
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
if agent.Status == nil {
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
}
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) (agentDescrChanged bool) {
agentDescrChanged = agent.updateAgentDescription(newStatus, configProvider)
agent.updateRemoteConfigStatus(newStatus)
agent.updateHealth(newStatus)
return agentDescrChanged
@@ -238,7 +259,7 @@ func (agent *Agent) processStatusUpdate(
// current status is not up-to-date.
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
agentDescrChanged := agent.updateStatusField(newStatus)
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
// Check if any fields were omitted in the status report.
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&

View File

@@ -1,6 +1,11 @@
package model
import "github.com/SigNoz/signoz/pkg/valuer"
import (
"context"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Interface for source of otel collector config recommendations.
type AgentConfigProvider interface {
@@ -20,4 +25,10 @@ type AgentConfigProvider interface {
configId string,
err error,
)
// GetDeployStatusByHash returns the DeployStatus for the given config hash
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
// no matching row exists. Used by the agent's first-connect handler to
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
}

View File

@@ -66,6 +66,7 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
defer coordinator.mutex.Unlock()
key := getSubscriberKey(orgId, hash)
if subs, ok := coordinator.subscribers[key]; ok {
subs = append(subs, ss)
coordinator.subscribers[key] = subs

View File

@@ -6,6 +6,7 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/uptrace/bun"
)
@@ -17,6 +18,15 @@ const (
AgentStatusDisconnected
)
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
}
type StorableAgent struct {
bun.BaseModel `bun:"table:agent"`
@@ -30,16 +40,6 @@ type StorableAgent struct {
Config string `bun:"config,type:text,notnull"`
}
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
return StorableAgent{
OrgID: orgID,
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
AgentID: agentID,
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
Status: status,
}
}
type ElementType struct{ valuer.String }
var (
@@ -49,24 +49,6 @@ var (
ElementTypeLbExporter = ElementType{valuer.NewString("lb_exporter")}
)
// NewElementType creates a new ElementType from a string value.
// Returns the corresponding ElementType constant if the string matches,
// otherwise returns an empty ElementType.
func NewElementType(value string) ElementType {
switch valuer.NewString(value) {
case ElementTypeSamplingRules.String:
return ElementTypeSamplingRules
case ElementTypeDropRules.String:
return ElementTypeDropRules
case ElementTypeLogPipelines.String:
return ElementTypeLogPipelines
case ElementTypeLbExporter.String:
return ElementTypeLbExporter
default:
return ElementType{valuer.NewString("")}
}
}
type DeployStatus struct{ valuer.String }
var (
@@ -98,6 +80,26 @@ type AgentConfigVersion struct {
Config string `json:"config" bun:"config,type:text"`
}
type AgentConfigElement struct {
bun.BaseModel `bun:"table:agent_config_element"`
types.Identifiable
types.TimeAuditable
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
}
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
return StorableAgent{
OrgID: orgID,
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
AgentID: agentID,
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
Status: status,
}
}
func NewAgentConfigVersion(orgId valuer.UUID, userId valuer.UUID, elementType ElementType) *AgentConfigVersion {
return &AgentConfigVersion{
TimeAuditable: types.TimeAuditable{
@@ -118,12 +120,20 @@ func (a *AgentConfigVersion) IncrementVersion(lastVersion int) {
a.Version = lastVersion + 1
}
type AgentConfigElement struct {
bun.BaseModel `bun:"table:agent_config_element"`
types.Identifiable
types.TimeAuditable
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
// NewElementType creates a new ElementType from a string value.
// Returns the corresponding ElementType constant if the string matches,
// otherwise returns an empty ElementType.
func NewElementType(value string) ElementType {
switch valuer.NewString(value) {
case ElementTypeSamplingRules.String:
return ElementTypeSamplingRules
case ElementTypeDropRules.String:
return ElementTypeDropRules
case ElementTypeLogPipelines.String:
return ElementTypeLogPipelines
case ElementTypeLbExporter.String:
return ElementTypeLbExporter
default:
return ElementType{valuer.NewString("")}
}
}