fix: initial commit for agents

This commit is contained in:
nityanandagohain
2025-03-17 16:56:42 +05:30
parent e92e0b6e29
commit cfb226df4d
18 changed files with 454 additions and 325 deletions

View File

@@ -253,5 +253,6 @@
"http-proxy-middleware": "3.0.3",
"cross-spawn": "7.0.5",
"cookie": "^0.7.1"
}
},
"packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
}

View File

@@ -1,6 +1,9 @@
package agentConf
import "go.signoz.io/signoz/pkg/query-service/model"
import (
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/types"
)
// Interface for features implemented via agent config.
// Eg: ingestion side signal pre-processing features like log processing pipelines etc
@@ -12,7 +15,7 @@ type AgentFeature interface {
// `configVersion` for the feature's settings
RecommendAgentConfig(
currentConfYaml []byte,
configVersion *ConfigVersion,
configVersion *types.AgentConfigVersion,
) (
recommendedConfYaml []byte,

View File

@@ -6,51 +6,43 @@ import (
"fmt"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/types"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
// Repo handles DDL and DML ops on ingestion rules
type Repo struct {
db *sqlx.DB
db *bun.DB
}
func (r *Repo) GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {
var c []ConfigVersion
err := r.db.SelectContext(ctx, &c, fmt.Sprintf(`SELECT
version,
id,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT NAME FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result,
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions AS v
WHERE element_type = $1
ORDER BY created_at desc, version desc
limit %v`, limit),
typ)
ctx context.Context, typ types.ElementTypeDef, limit int,
) ([]types.AgentConfigVersion, *model.ApiError) {
var c []types.AgentConfigVersion
err := r.db.NewSelect().
Model(&c).
ColumnExpr("version, id, element_type, COALESCE(created_by, -1) as created_by, created_at").
ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE id = v.created_by), 'unknown') as created_by_name`).
ColumnExpr("active, is_valid, disabled, deploy_status, deploy_result").
ColumnExpr("coalesce(last_hash, '') as last_hash, coalesce(last_config, '{}') as last_config").
TableExpr("agent_config_versions AS v").
Where("element_type = ?", typ).
OrderExpr("created_at DESC, version DESC").
Limit(limit).
Scan(ctx)
if err != nil {
return nil, model.InternalError(err)
}
incompleteStatuses := []DeployStatus{DeployInitiated, Deploying}
incompleteStatuses := []types.DeployStatus{types.DeployInitiated, types.Deploying}
for idx := 1; idx < len(c); idx++ {
if slices.Contains(incompleteStatuses, c[idx].DeployStatus) {
c[idx].DeployStatus = DeployStatusUnknown
c[idx].DeployStatus = types.DeployStatusUnknown
}
}
@@ -58,32 +50,27 @@ func (r *Repo) GetConfigHistory(
}
func (r *Repo) GetConfigVersion(
ctx context.Context, typ ElementTypeDef, v int,
) (*ConfigVersion, *model.ApiError) {
var c ConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT
id,
version,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT NAME FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result,
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions v
WHERE element_type = $1
AND version = $2`, typ, v)
ctx context.Context, typ types.ElementTypeDef, v int,
) (*types.AgentConfigVersion, *model.ApiError) {
var c types.AgentConfigVersion
err := r.db.NewSelect().
Model(&c).
ColumnExpr("id, version, element_type").
ColumnExpr("COALESCE(created_by, -1) as created_by").
ColumnExpr("created_at").
ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE id = v.created_by), 'unknown') as created_by_name`).
ColumnExpr("active, is_valid, disabled, deploy_status, deploy_result").
ColumnExpr("coalesce(last_hash, '') as last_hash").
ColumnExpr("coalesce(last_config, '{}') as last_config").
TableExpr("agent_config_versions AS v").
Where("element_type = ?", typ).
Where("version = ?", v).
Scan(ctx)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, model.NotFoundError(err)
}
return nil, model.InternalError(err)
}
@@ -91,33 +78,25 @@ func (r *Repo) GetConfigVersion(
}
func (r *Repo) GetLatestVersion(
ctx context.Context, typ ElementTypeDef,
) (*ConfigVersion, *model.ApiError) {
var c ConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT
id,
version,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT NAME FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result
FROM agent_config_versions AS v
WHERE element_type = $1
AND version = (
SELECT MAX(version)
FROM agent_config_versions
WHERE element_type=$2)`, typ, typ)
ctx context.Context, typ types.ElementTypeDef,
) (*types.AgentConfigVersion, *model.ApiError) {
var c types.AgentConfigVersion
err := r.db.NewSelect().
Model(&c).
ColumnExpr("id, version, element_type").
ColumnExpr("COALESCE(created_by, -1) as created_by").
ColumnExpr("created_at").
ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE id = v.created_by), 'unknown') as created_by_name`).
ColumnExpr("active, is_valid, disabled, deploy_status, deploy_result").
TableExpr("agent_config_versions AS v").
Where("element_type = ?", typ).
Where("version = (SELECT MAX(version) FROM agent_config_versions WHERE element_type = ?)", typ).
Scan(ctx)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, model.NotFoundError(err)
}
return nil, model.InternalError(err)
}
@@ -125,7 +104,7 @@ func (r *Repo) GetLatestVersion(
}
func (r *Repo) insertConfig(
ctx context.Context, userId string, c *ConfigVersion, elements []string,
ctx context.Context, userId string, c *types.AgentConfigVersion, elements []string,
) (fnerr *model.ApiError) {
if string(c.ElementType) == "" {
@@ -135,7 +114,7 @@ func (r *Repo) insertConfig(
}
// allowing empty elements for logs - use case is deleting all pipelines
if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines {
if len(elements) == 0 && c.ElementType != types.ElementTypeLogPipelines {
zap.L().Error("insert config called with no elements ", zap.String("ElementType", string(c.ElementType)))
return model.BadRequest(fmt.Errorf("config must have atleast one element"))
}
@@ -157,7 +136,7 @@ func (r *Repo) insertConfig(
}
if configVersion != nil {
c.Version = updateVersion(configVersion.Version)
c.Version = types.UpdateVersion(configVersion.Version)
} else {
// first version
c.Version = 1
@@ -166,57 +145,41 @@ func (r *Repo) insertConfig(
defer func() {
if fnerr != nil {
// remove all the damage (invalid rows from db)
r.db.Exec("DELETE FROM agent_config_versions WHERE id = $1", c.ID)
r.db.Exec("DELETE FROM agent_config_elements WHERE version_id=$1", c.ID)
r.db.NewDelete().Model((*types.AgentConfigVersion)(nil)).Where("id = ?", c.ID).Exec(ctx)
r.db.NewDelete().Model((*types.AgentConfigElement)(nil)).Where("version_id = ?", c.ID).Exec(ctx)
}
}()
// insert config
configQuery := `INSERT INTO agent_config_versions(
id,
version,
created_by,
element_type,
active,
is_valid,
disabled,
deploy_status,
deploy_result)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`
_, dbErr := r.db.ExecContext(ctx,
configQuery,
c.ID,
c.Version,
userId,
c.ElementType,
false,
false,
false,
c.DeployStatus,
c.DeployResult)
_, dbErr := r.db.NewInsert().
Model(&types.AgentConfigVersion{
ID: c.ID,
Version: c.Version,
UserAuditable: types.UserAuditable{
CreatedBy: userId,
},
ElementType: c.ElementType,
Active: false, // default value
IsValid: false, // default value
Disabled: false, // default value
DeployStatus: c.DeployStatus,
DeployResult: c.DeployResult,
}).
Exec(ctx)
if dbErr != nil {
zap.L().Error("error in inserting config version: ", zap.Error(dbErr))
return model.InternalError(errors.Wrap(dbErr, "failed to insert ingestion rule"))
}
elementsQuery := `INSERT INTO agent_config_elements(
id,
version_id,
element_type,
element_id)
VALUES ($1, $2, $3, $4)`
for _, e := range elements {
_, dbErr = r.db.ExecContext(
ctx,
elementsQuery,
uuid.NewString(),
c.ID,
c.ElementType,
e,
)
agentConfigElement := &types.AgentConfigElement{
ID: uuid.NewString(),
VersionID: c.ID,
ElementType: string(c.ElementType),
ElementID: e,
}
_, dbErr = r.db.NewInsert().Model(agentConfigElement).Exec(ctx)
if dbErr != nil {
return model.InternalError(dbErr)
}
@@ -226,25 +189,25 @@ func (r *Repo) insertConfig(
}
func (r *Repo) updateDeployStatus(ctx context.Context,
elementType ElementTypeDef,
elementType types.ElementTypeDef,
version int,
status string,
result string,
lastHash string,
lastconf string) *model.ApiError {
updateQuery := `UPDATE agent_config_versions
set deploy_status = $1,
deploy_result = $2,
last_hash = COALESCE($3, last_hash),
last_config = $4
WHERE version=$5
AND element_type = $6`
_, err := r.db.ExecContext(ctx, updateQuery, status, result, lastHash, lastconf, version, string(elementType))
_, err := r.db.NewUpdate().
Model((*types.AgentConfigVersion)(nil)).
Set("deploy_status = ?", status).
Set("deploy_result = ?", result).
Set("last_hash = COALESCE(?, last_hash)", lastHash).
Set("last_config = ?", lastconf).
Where("version = ?", version).
Where("element_type = ?", elementType).
Exec(ctx)
if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err))
return model.BadRequest(fmt.Errorf("failed to update deploy status"))
return model.BadRequest(fmt.Errorf("failed to update deploy status"))
}
return nil
@@ -254,12 +217,12 @@ func (r *Repo) updateDeployStatusByHash(
ctx context.Context, confighash string, status string, result string,
) *model.ApiError {
updateQuery := `UPDATE agent_config_versions
set deploy_status = $1,
deploy_result = $2
WHERE last_hash=$4`
_, err := r.db.ExecContext(ctx, updateQuery, status, result, confighash)
_, err := r.db.NewUpdate().
Model((*types.AgentConfigVersion)(nil)).
Set("deploy_status = ?", status).
Set("deploy_result = ?", result).
Where("last_hash = ?", confighash).
Exec(ctx)
if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err))
return model.InternalError(errors.Wrap(err, "failed to update deploy status"))

View File

@@ -9,12 +9,13 @@ import (
"sync/atomic"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"go.signoz.io/signoz/pkg/query-service/app/opamp"
filterprocessor "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor"
tsp "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/types"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v3"
)
@@ -39,7 +40,7 @@ type Manager struct {
}
type ManagerOptions struct {
DB *sqlx.DB
DB *bun.DB
// When acting as opamp.AgentConfigProvider, agent conf recommendations are
// applied to the base conf in the order the features have been specified here.
@@ -100,7 +101,7 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
settingVersionsUsed := []string{}
for _, feature := range m.agentFeatures {
featureType := ElementTypeDef(feature.AgentFeatureType())
featureType := types.ElementTypeDef(feature.AgentFeatureType())
latestConfig, apiErr := GetLatestVersion(context.Background(), featureType)
if apiErr != nil && apiErr.Type() != model.ErrorNotFound {
return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version")
@@ -133,7 +134,7 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
context.Background(),
featureType,
configVersion,
string(DeployInitiated),
string(types.DeployInitiated),
"Deployment has started",
configId,
serializedSettingsUsed,
@@ -162,10 +163,10 @@ func (m *Manager) ReportConfigDeploymentStatus(
) {
featureConfigIds := strings.Split(configId, ",")
for _, featureConfId := range featureConfigIds {
newStatus := string(Deployed)
newStatus := string(types.Deployed)
message := "Deployment was successful"
if err != nil {
newStatus = string(DeployFailed)
newStatus = string(types.DeployFailed)
message = fmt.Sprintf("%s: %s", agentId, err.Error())
}
m.updateDeployStatusByHash(
@@ -175,30 +176,30 @@ func (m *Manager) ReportConfigDeploymentStatus(
}
func GetLatestVersion(
ctx context.Context, elementType ElementTypeDef,
) (*ConfigVersion, *model.ApiError) {
ctx context.Context, elementType types.ElementTypeDef,
) (*types.AgentConfigVersion, *model.ApiError) {
return m.GetLatestVersion(ctx, elementType)
}
func GetConfigVersion(
ctx context.Context, elementType ElementTypeDef, version int,
) (*ConfigVersion, *model.ApiError) {
ctx context.Context, elementType types.ElementTypeDef, version int,
) (*types.AgentConfigVersion, *model.ApiError) {
return m.GetConfigVersion(ctx, elementType, version)
}
func GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {
ctx context.Context, typ types.ElementTypeDef, limit int,
) ([]types.AgentConfigVersion, *model.ApiError) {
return m.GetConfigHistory(ctx, typ, limit)
}
// StartNewVersion launches a new config version for given set of elements
func StartNewVersion(
ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string,
) (*ConfigVersion, *model.ApiError) {
ctx context.Context, userId string, eleType types.ElementTypeDef, elementIds []string,
) (*types.AgentConfigVersion, *model.ApiError) {
// create a new version
cfg := NewConfigVersion(eleType)
cfg := types.NewAgentConfigVersion(eleType)
// insert new config and elements into database
err := m.insertConfig(ctx, userId, cfg, elementIds)
@@ -215,7 +216,7 @@ func NotifyConfigUpdate(ctx context.Context) {
m.notifyConfigUpdateSubscribers()
}
func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiError {
func Redeploy(ctx context.Context, typ types.ElementTypeDef, version int) *model.ApiError {
configVersion, err := GetConfigVersion(ctx, typ, version)
if err != nil {
@@ -223,14 +224,14 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
return model.WrapApiError(err, "failed to fetch details of the config version")
}
if configVersion == nil || (configVersion != nil && configVersion.LastConf == "") {
if configVersion == nil || (configVersion != nil && configVersion.LastConfig == "") {
zap.L().Debug("config version has no conf yaml", zap.Any("configVersion", configVersion))
return model.BadRequest(fmt.Errorf("the config version can not be redeployed"))
}
switch typ {
case ElementTypeSamplingRules:
case types.ElementTypeSamplingRules:
var config *tsp.Config
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &config); err != nil {
if err := yaml.Unmarshal([]byte(configVersion.LastConfig), &config); err != nil {
zap.L().Debug("failed to read last conf correctly", zap.Error(err))
return model.BadRequest(fmt.Errorf("failed to read the stored config correctly"))
}
@@ -247,10 +248,10 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
return model.InternalError(fmt.Errorf("failed to deploy the config"))
}
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
case ElementTypeDropRules:
m.updateDeployStatus(ctx, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, configVersion.LastConfig)
case types.ElementTypeDropRules:
var filterConfig *filterprocessor.Config
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &filterConfig); err != nil {
if err := yaml.Unmarshal([]byte(configVersion.LastConfig), &filterConfig); err != nil {
zap.L().Error("failed to read last conf correctly", zap.Error(err))
return model.InternalError(fmt.Errorf("failed to read the stored config correctly"))
}
@@ -265,7 +266,7 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
return err
}
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
m.updateDeployStatus(ctx, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, configVersion.LastConfig)
}
return nil
@@ -296,7 +297,7 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
}
m.updateDeployStatus(ctx, ElementTypeDropRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
m.updateDeployStatus(ctx, types.ElementTypeDropRules, version, string(types.DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
return nil
}
@@ -307,7 +308,7 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
// but can be improved in future to accept continuous request status updates from opamp
func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) {
status := string(Deployed)
status := string(types.Deployed)
message := "Deployment was successful"
@@ -316,7 +317,7 @@ func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) {
}()
if err != nil {
status = string(DeployFailed)
status = string(types.DeployFailed)
message = fmt.Sprintf("%s: %s", agentId, err.Error())
}
@@ -347,6 +348,6 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
}
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
m.updateDeployStatus(ctx, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
return nil
}

View File

@@ -1,72 +1,10 @@
package agentConf
import (
"time"
"github.com/google/uuid"
)
type ElementTypeDef string
const (
ElementTypeSamplingRules ElementTypeDef = "sampling_rules"
ElementTypeDropRules ElementTypeDef = "drop_rules"
ElementTypeLogPipelines ElementTypeDef = "log_pipelines"
ElementTypeLbExporter ElementTypeDef = "lb_exporter"
)
type DeployStatus string
const (
PendingDeploy DeployStatus = "DIRTY"
Deploying DeployStatus = "DEPLOYING"
Deployed DeployStatus = "DEPLOYED"
DeployInitiated DeployStatus = "IN_PROGRESS"
DeployFailed DeployStatus = "FAILED"
DeployStatusUnknown DeployStatus = "UNKNOWN"
)
type ConfigVersion struct {
ID string `json:"id" db:"id"`
Version int `json:"version" db:"version"`
ElementType ElementTypeDef `json:"elementType" db:"element_type"`
Active bool `json:"active" db:"active"`
IsValid bool `json:"is_valid" db:"is_valid"`
Disabled bool `json:"disabled" db:"disabled"`
DeployStatus DeployStatus `json:"deployStatus" db:"deploy_status"`
DeployResult string `json:"deployResult" db:"deploy_result"`
LastHash string `json:"lastHash" db:"last_hash"`
LastConf string `json:"lastConf" db:"last_config"`
CreatedBy string `json:"createdBy" db:"created_by"`
CreatedByName string `json:"createdByName" db:"created_by_name"`
CreatedAt time.Time `json:"createdAt" db:"created_at"`
}
func NewConfigVersion(typeDef ElementTypeDef) *ConfigVersion {
return &ConfigVersion{
ID: uuid.NewString(),
ElementType: typeDef,
Active: false,
IsValid: false,
Disabled: false,
DeployStatus: PendingDeploy,
LastHash: "",
LastConf: "{}",
// todo: get user id from context?
// CreatedBy
}
}
func updateVersion(v int) int {
return v + 1
}
import "go.signoz.io/signoz/pkg/types"
type ConfigElements struct {
VersionID string
Version int
ElementType ElementTypeDef
ElementType types.ElementTypeDef
ElementId string
}

View File

@@ -14,6 +14,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/utils"
"go.signoz.io/signoz/pkg/types"
"go.signoz.io/signoz/pkg/types/authtypes"
"go.uber.org/zap"
)
@@ -38,10 +39,10 @@ func NewLogParsingPipelinesController(
// PipelinesResponse is used to prepare http response for pipelines config related requests
type PipelinesResponse struct {
*agentConf.ConfigVersion
*types.AgentConfigVersion
Pipelines []Pipeline `json:"pipelines"`
History []agentConf.ConfigVersion `json:"history"`
Pipelines []Pipeline `json:"pipelines"`
History []types.AgentConfigVersion `json:"history"`
}
// ApplyPipelines stores new or changed pipelines and initiates a new config update
@@ -84,7 +85,7 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
}
// prepare config by calling gen func
cfg, err := agentConf.StartNewVersion(ctx, claims.UserID, agentConf.ElementTypeLogPipelines, elements)
cfg, err := agentConf.StartNewVersion(ctx, claims.UserID, types.ElementTypeLogPipelines, elements)
if err != nil || cfg == nil {
return nil, err
}
@@ -199,9 +200,9 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion(
return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version"))
}
var configVersion *agentConf.ConfigVersion
var configVersion *types.AgentConfigVersion
if version >= 0 {
cv, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version)
cv, err := agentConf.GetConfigVersion(ctx, types.ElementTypeLogPipelines, version)
if err != nil {
zap.L().Error("failed to get config for version", zap.Int("version", version), zap.Error(err))
return nil, model.WrapApiError(err, "failed to get config for given version")
@@ -210,8 +211,8 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion(
}
return &PipelinesResponse{
ConfigVersion: configVersion,
Pipelines: pipelines,
AgentConfigVersion: configVersion,
Pipelines: pipelines,
}, nil
}
@@ -251,7 +252,7 @@ func (pc *LogParsingPipelineController) AgentFeatureType() agentConf.AgentFeatur
// Implements agentConf.AgentFeature interface.
func (pc *LogParsingPipelineController) RecommendAgentConfig(
currentConfYaml []byte,
configVersion *agentConf.ConfigVersion,
configVersion *types.AgentConfigVersion,
) (
recommendedConfYaml []byte,
serializedSettingsUsed string,

View File

@@ -166,7 +166,7 @@ type testbed struct {
func newTestbed(t *testing.T) *testbed {
testDB := utils.NewQueryServiceDBForTests(t)
_, err := model.InitDB(testDB.SQLxDB())
_, err := model.InitDB(testDB.BunDB())
if err != nil {
t.Fatalf("could not init opamp model: %v", err)
}

View File

@@ -60,13 +60,13 @@ func UpsertControlProcessors(
agenthash, err := addIngestionControlToAgent(agent, signal, processors, false)
if err != nil {
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.ID), zap.Error(err))
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.AgentID), zap.Error(err))
continue
}
if agenthash != "" {
// subscribe callback
model.ListenToConfigUpdate(agent.ID, agenthash, callback)
model.ListenToConfigUpdate(agent.AgentID, agenthash, callback)
}
hash = agenthash
@@ -89,7 +89,7 @@ func addIngestionControlToAgent(agent *model.Agent, signal string, processors ma
// add ingestion control spec
err = makeIngestionControlSpec(agentConf, Signal(signal), processors)
if err != nil {
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.ID), zap.Error(err))
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
return confHash, err
}

View File

@@ -7,33 +7,18 @@ import (
"sync"
"time"
"go.signoz.io/signoz/pkg/types"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
opampTypes "github.com/open-telemetry/opamp-go/server/types"
)
type AgentStatus int
const (
AgentStatusUnknown AgentStatus = iota
AgentStatusConnected
AgentStatusDisconnected
)
// set in agent description when agent is capable of supporting
// lb exporter configuration. values: 1 (true) or 0 (false)
const lbExporterFlag = "capabilities.lbexporter"
type Agent struct {
ID string `json:"agentId" yaml:"agentId" db:"agent_id"`
StartedAt time.Time `json:"startedAt" yaml:"startedAt" db:"started_at"`
TerminatedAt time.Time `json:"terminatedAt" yaml:"terminatedAt" db:"terminated_at"`
EffectiveConfig string `json:"effectiveConfig" yaml:"effectiveConfig" db:"effective_config"`
CurrentStatus AgentStatus `json:"currentStatus" yaml:"currentStatus" db:"current_status"`
remoteConfig *protobufs.AgentRemoteConfig
Status *protobufs.AgentToServer
types.StorableAgent
remoteConfig *protobufs.AgentRemoteConfig
Status *protobufs.AgentToServer
// can this agent be load balancer
CanLB bool
@@ -41,13 +26,17 @@ type Agent struct {
// is this agent setup as load balancer
IsLb bool
conn types.Connection
conn opampTypes.Connection
connMutex sync.Mutex
mux sync.RWMutex
}
func New(ID string, conn types.Connection) *Agent {
return &Agent{ID: ID, StartedAt: time.Now(), CurrentStatus: AgentStatusConnected, conn: conn}
// set in agent description when agent is capable of supporting
// lb exporter configuration. values: 1 (true) or 0 (false)
const lbExporterFlag = "capabilities.lbexporter"
func New(orgID string, ID string, conn opampTypes.Connection) *Agent {
return &Agent{StorableAgent: types.StorableAgent{OrgID: orgID, AgentID: ID, StartedAt: time.Now(), CurrentStatus: types.AgentStatusConnected}, conn: conn}
}
// Upsert inserts or updates the agent in the database.
@@ -55,17 +44,13 @@ func (agent *Agent) Upsert() error {
agent.mux.Lock()
defer agent.mux.Unlock()
_, err := db.NamedExec(`INSERT OR REPLACE INTO agents (
agent_id,
started_at,
effective_config,
current_status
) VALUES (
:agent_id,
:started_at,
:effective_config,
:current_status
)`, agent)
_, err := db.NewInsert().
Model(&agent.StorableAgent).
On("CONFLICT (org_id, agent_id) DO UPDATE").
Set("started_at = EXCLUDED.started_at").
Set("effective_config = EXCLUDED.effective_config").
Set("current_status = EXCLUDED.current_status").
Exec(context.Background())
if err != nil {
return err
}
@@ -135,11 +120,11 @@ func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (
// todo: need to address multiple agent scenario here
// for now, the first response will be sent back to the UI
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
onConfigSuccess(agent.ID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
onConfigSuccess(agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
}
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
onConfigFailure(agent.ID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
onConfigFailure(agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
}
}
}
@@ -266,7 +251,7 @@ func (agent *Agent) processStatusUpdate(
agent.SendToAgent(response)
ListenToConfigUpdate(
agent.ID,
agent.AgentID,
string(response.RemoteConfig.ConfigHash),
configProvider.ReportConfigDeploymentStatus,
)
@@ -276,7 +261,7 @@ func (agent *Agent) processStatusUpdate(
func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool {
recommendedConfig, confId, err := configProvider.RecommendAgentConfig([]byte(agent.EffectiveConfig))
if err != nil {
zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.ID), zap.Error(err))
zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
return false
}

View File

@@ -1,18 +1,20 @@
package model
import (
"context"
"fmt"
"sync"
"time"
"github.com/jmoiron/sqlx"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
"github.com/pkg/errors"
"github.com/uptrace/bun"
signozTypes "go.signoz.io/signoz/pkg/types"
"go.uber.org/zap"
)
var db *sqlx.DB
var db *bun.DB
var AllAgents = Agents{
agentsById: map[string]*Agent{},
@@ -30,7 +32,7 @@ func (a *Agents) Count() int {
}
// Initialize the database and create schema if needed
func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) {
func InitDB(qsDB *bun.DB) (*bun.DB, error) {
db = qsDB
AllAgents = Agents{
@@ -49,7 +51,7 @@ func (agents *Agents) RemoveConnection(conn types.Connection) {
for instanceId := range agents.connections[conn] {
agent := agents.agentsById[instanceId]
agent.CurrentStatus = AgentStatusDisconnected
agent.CurrentStatus = signozTypes.AgentStatusDisconnected
agent.TerminatedAt = time.Now()
agent.Upsert()
delete(agents.agentsById, instanceId)
@@ -64,6 +66,24 @@ func (agents *Agents) FindAgent(agentID string) *Agent {
return agents.agentsById[agentID]
}
func (agents *Agents) getDefaultOrgID() (string, error) {
var orgID []string
err := db.NewSelect().Model((*signozTypes.Organization)(nil)).Column("id").Scan(context.Background(), &orgID)
if err != nil {
return "", err
}
if len(orgID) == 0 {
return "", nil
}
if len(orgID) != 1 {
return "", fmt.Errorf("expected exactly one organization, but found %d", len(orgID))
}
return orgID[0], nil
}
// FindOrCreateAgent returns the Agent instance associated with the given agentID.
// If the Agent instance does not exist, it is created and added to the list of
// Agent instances.
@@ -72,9 +92,12 @@ func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection) (
defer agents.mux.Unlock()
var created bool
agent, ok := agents.agentsById[agentID]
var err error
if !ok || agent == nil {
agent = New(agentID, conn)
orgID, err := agents.getDefaultOrgID()
if err != nil {
return nil, created, err
}
agent = New(orgID, agentID, conn)
err = agent.Upsert()
if err != nil {
return nil, created, err
@@ -112,14 +135,14 @@ func (agents *Agents) RecommendLatestConfigToAll(
)
if err != nil {
return errors.Wrap(err, fmt.Sprintf(
"could not generate conf recommendation for %v", agent.ID,
"could not generate conf recommendation for %v", agent.AgentID,
))
}
// Recommendation is same as current config
if string(newConfig) == agent.EffectiveConfig {
zap.L().Info(
"Recommended config same as current effective config for agent", zap.String("agentID", agent.ID),
"Recommended config same as current effective config for agent", zap.String("agentID", agent.AgentID),
)
return nil
}
@@ -144,7 +167,7 @@ func (agents *Agents) RecommendLatestConfigToAll(
RemoteConfig: newRemoteConfig,
})
ListenToConfigUpdate(agent.ID, confId, provider.ReportConfigDeploymentStatus)
ListenToConfigUpdate(agent.AgentID, confId, provider.ReportConfigDeploymentStatus)
}
return nil
}

View File

@@ -91,7 +91,7 @@ func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer
agent.CanLB = model.ExtractLbFlag(msg.AgentDescription)
zap.L().Debug(
"New agent added", zap.Bool("canLb", agent.CanLB),
zap.String("ID", agent.ID),
zap.String("ID", agent.AgentID),
zap.Any("status", agent.CurrentStatus),
)
}

View File

@@ -235,13 +235,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.privateHTTP = privateServer
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB())
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.BunDB())
if err != nil {
return nil, err
}
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: serverOptions.SigNoz.SQLStore.SQLxDB(),
DB: serverOptions.SigNoz.SQLStore.BunDB(),
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
},

View File

@@ -20,7 +20,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/app/opamp"
opampModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/dao"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@@ -120,7 +119,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
"pipelines config history should not be empty after 1st configuration",
)
require.Equal(
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
types.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
"pipelines deployment should be in progress after 1st configuration",
)
@@ -132,7 +131,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
t, postablePipelines, getPipelinesResp,
)
require.Equal(
agentConf.Deployed,
types.Deployed,
getPipelinesResp.History[0].DeployStatus,
"pipeline deployment should be complete after acknowledgment from opamp client",
)
@@ -152,7 +151,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
"there should be 2 history entries after posting pipelines config for the 2nd time",
)
require.Equal(
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
types.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
"deployment should be in progress for latest pipeline config",
)
@@ -164,7 +163,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
t, postablePipelines, getPipelinesResp,
)
require.Equal(
agentConf.Deployed,
types.Deployed,
getPipelinesResp.History[0].DeployStatus,
"deployment for latest pipeline config should be complete after acknowledgment from opamp client",
)
@@ -218,7 +217,7 @@ func TestLogPipelinesHistory(t *testing.T) {
testbed.PostPipelinesToQS(postablePipelines)
getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal(1, len(getPipelinesResp.History))
require.Equal(agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
require.Equal(types.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
postablePipelines.Pipelines[0].Config = append(
postablePipelines.Pipelines[0].Config,
@@ -237,8 +236,8 @@ func TestLogPipelinesHistory(t *testing.T) {
getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal(2, len(getPipelinesResp.History))
require.Equal(agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
require.Equal(agentConf.DeployStatusUnknown, getPipelinesResp.History[1].DeployStatus)
require.Equal(types.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
require.Equal(types.DeployStatusUnknown, getPipelinesResp.History[1].DeployStatus)
}
func TestLogPipelinesValidation(t *testing.T) {
@@ -481,11 +480,11 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
}
// Mock an available opamp agent
testDB, err := opampModel.InitDB(sqlStore.SQLxDB())
// testDB, err := opampModel.InitDB(sqlStore.SQLxDB())
require.Nil(t, err, "failed to init opamp model")
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: testDB,
DB: sqlStore.BunDB(), // todo(nitya): fix this
AgentFeatures: []agentConf.AgentFeature{
apiHandler.LogsParsingPipelineController,
}})

View File

@@ -48,6 +48,8 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s
sqlmigration.NewModifyDatetimeFactory(),
sqlmigration.NewModifyOrgDomainFactory(),
sqlmigration.NewUpdateOrganizationFactory(sqlStore),
sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlStore),
sqlmigration.NewUpdateAgentsFactory(sqlStore),
),
)
if err != nil {

View File

@@ -59,6 +59,8 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
sqlmigration.NewModifyOrgDomainFactory(),
sqlmigration.NewUpdateOrganizationFactory(sqlstore),
sqlmigration.NewAddAlertmanagerFactory(),
sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlstore),
sqlmigration.NewUpdateAgentsFactory(sqlstore),
)
}

View File

@@ -0,0 +1,80 @@
package sqlmigration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types"
)
type updateDashboardAndSavedViews struct {
store sqlstore.SQLStore
}
func NewUpdateDashboardAndSavedViewsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("update_dashboards_savedviews"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdateDashboardAndSavedViews(ctx, ps, c, sqlstore)
})
}
func newUpdateDashboardAndSavedViews(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updateDashboardAndSavedViews{
store: store,
}, nil
}
func (migration *updateDashboardAndSavedViews) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateDashboardAndSavedViews) Up(ctx context.Context, db *bun.DB) error {
// begin transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// get all org ids
var orgIDs []string
if err := migration.store.BunDB().NewSelect().Model((*types.Organization)(nil)).Column("id").Scan(ctx, &orgIDs); err != nil {
return err
}
// add org id to dashboards table
for _, table := range []string{"dashboards", "saved_views"} {
if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, table, "org_id"); err != nil {
return err
} else if !exists {
if _, err := tx.NewAddColumn().Table(table).ColumnExpr("org_id TEXT REFERENCES organizations(id) ON DELETE CASCADE").Exec(ctx); err != nil {
return err
}
// check if there is one org ID if yes then set it to all dashboards.
if len(orgIDs) == 1 {
orgID := orgIDs[0]
if _, err := tx.NewUpdate().Table(table).Set("org_id = ?", orgID).Where("org_id IS NULL").Exec(ctx); err != nil {
return err
}
}
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *updateDashboardAndSavedViews) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -0,0 +1,80 @@
package sqlmigration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types"
)
type updateAgents struct {
store sqlstore.SQLStore
}
func NewUpdateAgentsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("update_agents"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdateAgents(ctx, ps, c, sqlstore)
})
}
func newUpdateAgents(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updateAgents{
store: store,
}, nil
}
func (migration *updateAgents) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateAgents) Up(ctx context.Context, db *bun.DB) error {
// begin transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// get all org ids
var orgIDs []string
if err := migration.store.BunDB().NewSelect().Model((*types.Organization)(nil)).Column("id").Scan(ctx, &orgIDs); err != nil {
return err
}
// add org id to dashboards table
for _, table := range []string{"agents", "agent_config_versions", "agent_config_elements"} {
if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, table, "org_id"); err != nil {
return err
} else if !exists {
if _, err := tx.NewAddColumn().Table(table).ColumnExpr("org_id TEXT REFERENCES organizations(id) ON DELETE CASCADE").Exec(ctx); err != nil {
return err
}
// check if there is one org ID if yes then set it to all dashboards.
if len(orgIDs) == 1 {
orgID := orgIDs[0]
if _, err := tx.NewUpdate().Table(table).Set("org_id = ?", orgID).Where("org_id IS NULL").Exec(ctx); err != nil {
return err
}
}
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *updateAgents) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -3,41 +3,92 @@ package types
import (
"time"
"github.com/google/uuid"
"github.com/uptrace/bun"
)
type Agent struct {
bun.BaseModel `bun:"table:agents"`
AgentID string `bun:"agent_id,pk,type:text"`
StartedAt time.Time `bun:"started_at,type:datetime,notnull"`
TerminatedAt time.Time `bun:"terminated_at,type:datetime"`
CurrentStatus string `bun:"current_status,type:text,notnull"`
EffectiveConfig string `bun:"effective_config,type:text,notnull"`
type AgentStatus int
const (
AgentStatusUnknown AgentStatus = iota
AgentStatusConnected
AgentStatusDisconnected
)
type StorableAgent struct {
bun.BaseModel `bun:"table:agents"`
OrgID string `json:"orgId" yaml:"orgId" bun:"org_id,type:text"`
AgentID string `json:"agentId" yaml:"agentId" bun:"agent_id,pk,type:text"`
StartedAt time.Time `json:"startedAt" yaml:"startedAt" bun:"started_at,type:datetime,notnull"`
TerminatedAt time.Time `json:"terminatedAt" yaml:"terminatedAt" bun:"terminated_at,type:datetime"`
CurrentStatus AgentStatus `json:"currentStatus" yaml:"currentStatus" bun:"current_status,type:text,notnull"`
EffectiveConfig string `bun:"effective_config,type:text,notnull"`
}
type ElementTypeDef string
const (
ElementTypeSamplingRules ElementTypeDef = "sampling_rules"
ElementTypeDropRules ElementTypeDef = "drop_rules"
ElementTypeLogPipelines ElementTypeDef = "log_pipelines"
ElementTypeLbExporter ElementTypeDef = "lb_exporter"
)
type DeployStatus string
const (
PendingDeploy DeployStatus = "DIRTY"
Deploying DeployStatus = "DEPLOYING"
Deployed DeployStatus = "DEPLOYED"
DeployInitiated DeployStatus = "IN_PROGRESS"
DeployFailed DeployStatus = "FAILED"
DeployStatusUnknown DeployStatus = "UNKNOWN"
)
type AgentConfigVersion struct {
bun.BaseModel `bun:"table:agent_config_versions"`
ID string `bun:"id,pk,type:text"`
CreatedBy string `bun:"created_by,type:text"`
CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"`
UpdatedBy string `bun:"updated_by,type:text"`
UpdatedAt time.Time `bun:"updated_at,default:CURRENT_TIMESTAMP"`
Version int `bun:"version,default:1,unique:element_version_idx"`
Active int `bun:"active"`
IsValid int `bun:"is_valid"`
Disabled int `bun:"disabled"`
ElementType string `bun:"element_type,notnull,type:varchar(120),unique:element_version_idx"`
DeployStatus string `bun:"deploy_status,notnull,type:varchar(80),default:'DIRTY'"`
DeploySequence int `bun:"deploy_sequence"`
DeployResult string `bun:"deploy_result,type:text"`
LastHash string `bun:"last_hash,type:text"`
LastConfig string `bun:"last_config,type:text"`
TimeAuditable
UserAuditable
CreatedByName string `json:"createdByName" db:"created_by_name"`
OrgID string `json:"orgId" bun:"org_id,type:text"`
ID string `json:"id" bun:"id,pk,type:text"`
Version int `json:"version" bun:"version,default:1,unique:element_version_idx"`
Active bool `json:"active" bun:"active"`
IsValid bool `json:"isValid" bun:"is_valid"`
Disabled bool `json:"disabled" bun:"disabled"`
ElementType ElementTypeDef `json:"elementType" bun:"element_type,notnull,type:varchar(120),unique:element_version_idx"`
DeployStatus DeployStatus `json:"deployStatus" bun:"deploy_status,notnull,type:varchar(80),default:'DIRTY'"`
DeploySequence int `json:"deploySequence" bun:"deploy_sequence"`
DeployResult string `json:"deployResult" bun:"deploy_result,type:text"`
LastHash string `json:"lastHash" bun:"last_hash,type:text"`
LastConfig string `json:"lastConfig" bun:"last_config,type:text"`
}
func NewAgentConfigVersion(typeDef ElementTypeDef) *AgentConfigVersion {
return &AgentConfigVersion{
ID: uuid.NewString(),
ElementType: typeDef,
Active: false,
IsValid: false,
Disabled: false,
DeployStatus: PendingDeploy,
LastHash: "",
LastConfig: "{}",
}
}
func UpdateVersion(v int) int {
return v + 1
}
type AgentConfigElement struct {
bun.BaseModel `bun:"table:agent_config_elements"`
OrgID string `bun:"org_id,type:text"`
ID string `bun:"id,pk,type:text"`
CreatedBy string `bun:"created_by,type:text"`
CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"`