mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-06 09:42:18 +00:00
fix: initial commit for agents
This commit is contained in:
@@ -253,5 +253,6 @@
|
||||
"http-proxy-middleware": "3.0.3",
|
||||
"cross-spawn": "7.0.5",
|
||||
"cookie": "^0.7.1"
|
||||
}
|
||||
},
|
||||
"packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package agentConf
|
||||
|
||||
import "go.signoz.io/signoz/pkg/query-service/model"
|
||||
import (
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/types"
|
||||
)
|
||||
|
||||
// Interface for features implemented via agent config.
|
||||
// Eg: ingestion side signal pre-processing features like log processing pipelines etc
|
||||
@@ -12,7 +15,7 @@ type AgentFeature interface {
|
||||
// `configVersion` for the feature's settings
|
||||
RecommendAgentConfig(
|
||||
currentConfYaml []byte,
|
||||
configVersion *ConfigVersion,
|
||||
configVersion *types.AgentConfigVersion,
|
||||
) (
|
||||
recommendedConfYaml []byte,
|
||||
|
||||
|
||||
@@ -6,51 +6,43 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/uptrace/bun"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/types"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
// Repo handles DDL and DML ops on ingestion rules
|
||||
type Repo struct {
|
||||
db *sqlx.DB
|
||||
db *bun.DB
|
||||
}
|
||||
|
||||
func (r *Repo) GetConfigHistory(
|
||||
ctx context.Context, typ ElementTypeDef, limit int,
|
||||
) ([]ConfigVersion, *model.ApiError) {
|
||||
var c []ConfigVersion
|
||||
err := r.db.SelectContext(ctx, &c, fmt.Sprintf(`SELECT
|
||||
version,
|
||||
id,
|
||||
element_type,
|
||||
COALESCE(created_by, -1) as created_by,
|
||||
created_at,
|
||||
COALESCE((SELECT NAME FROM users
|
||||
WHERE id = v.created_by), "unknown") created_by_name,
|
||||
active,
|
||||
is_valid,
|
||||
disabled,
|
||||
deploy_status,
|
||||
deploy_result,
|
||||
coalesce(last_hash, '') as last_hash,
|
||||
coalesce(last_config, '{}') as last_config
|
||||
FROM agent_config_versions AS v
|
||||
WHERE element_type = $1
|
||||
ORDER BY created_at desc, version desc
|
||||
limit %v`, limit),
|
||||
typ)
|
||||
ctx context.Context, typ types.ElementTypeDef, limit int,
|
||||
) ([]types.AgentConfigVersion, *model.ApiError) {
|
||||
var c []types.AgentConfigVersion
|
||||
err := r.db.NewSelect().
|
||||
Model(&c).
|
||||
ColumnExpr("version, id, element_type, COALESCE(created_by, -1) as created_by, created_at").
|
||||
ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE id = v.created_by), 'unknown') as created_by_name`).
|
||||
ColumnExpr("active, is_valid, disabled, deploy_status, deploy_result").
|
||||
ColumnExpr("coalesce(last_hash, '') as last_hash, coalesce(last_config, '{}') as last_config").
|
||||
TableExpr("agent_config_versions AS v").
|
||||
Where("element_type = ?", typ).
|
||||
OrderExpr("created_at DESC, version DESC").
|
||||
Limit(limit).
|
||||
Scan(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, model.InternalError(err)
|
||||
}
|
||||
|
||||
incompleteStatuses := []DeployStatus{DeployInitiated, Deploying}
|
||||
incompleteStatuses := []types.DeployStatus{types.DeployInitiated, types.Deploying}
|
||||
for idx := 1; idx < len(c); idx++ {
|
||||
if slices.Contains(incompleteStatuses, c[idx].DeployStatus) {
|
||||
c[idx].DeployStatus = DeployStatusUnknown
|
||||
c[idx].DeployStatus = types.DeployStatusUnknown
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,32 +50,27 @@ func (r *Repo) GetConfigHistory(
|
||||
}
|
||||
|
||||
func (r *Repo) GetConfigVersion(
|
||||
ctx context.Context, typ ElementTypeDef, v int,
|
||||
) (*ConfigVersion, *model.ApiError) {
|
||||
var c ConfigVersion
|
||||
err := r.db.GetContext(ctx, &c, `SELECT
|
||||
id,
|
||||
version,
|
||||
element_type,
|
||||
COALESCE(created_by, -1) as created_by,
|
||||
created_at,
|
||||
COALESCE((SELECT NAME FROM users
|
||||
WHERE id = v.created_by), "unknown") created_by_name,
|
||||
active,
|
||||
is_valid,
|
||||
disabled,
|
||||
deploy_status,
|
||||
deploy_result,
|
||||
coalesce(last_hash, '') as last_hash,
|
||||
coalesce(last_config, '{}') as last_config
|
||||
FROM agent_config_versions v
|
||||
WHERE element_type = $1
|
||||
AND version = $2`, typ, v)
|
||||
ctx context.Context, typ types.ElementTypeDef, v int,
|
||||
) (*types.AgentConfigVersion, *model.ApiError) {
|
||||
var c types.AgentConfigVersion
|
||||
err := r.db.NewSelect().
|
||||
Model(&c).
|
||||
ColumnExpr("id, version, element_type").
|
||||
ColumnExpr("COALESCE(created_by, -1) as created_by").
|
||||
ColumnExpr("created_at").
|
||||
ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE id = v.created_by), 'unknown') as created_by_name`).
|
||||
ColumnExpr("active, is_valid, disabled, deploy_status, deploy_result").
|
||||
ColumnExpr("coalesce(last_hash, '') as last_hash").
|
||||
ColumnExpr("coalesce(last_config, '{}') as last_config").
|
||||
TableExpr("agent_config_versions AS v").
|
||||
Where("element_type = ?", typ).
|
||||
Where("version = ?", v).
|
||||
Scan(ctx)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, model.NotFoundError(err)
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, model.NotFoundError(err)
|
||||
}
|
||||
return nil, model.InternalError(err)
|
||||
}
|
||||
|
||||
@@ -91,33 +78,25 @@ func (r *Repo) GetConfigVersion(
|
||||
}
|
||||
|
||||
func (r *Repo) GetLatestVersion(
|
||||
ctx context.Context, typ ElementTypeDef,
|
||||
) (*ConfigVersion, *model.ApiError) {
|
||||
var c ConfigVersion
|
||||
err := r.db.GetContext(ctx, &c, `SELECT
|
||||
id,
|
||||
version,
|
||||
element_type,
|
||||
COALESCE(created_by, -1) as created_by,
|
||||
created_at,
|
||||
COALESCE((SELECT NAME FROM users
|
||||
WHERE id = v.created_by), "unknown") created_by_name,
|
||||
active,
|
||||
is_valid,
|
||||
disabled,
|
||||
deploy_status,
|
||||
deploy_result
|
||||
FROM agent_config_versions AS v
|
||||
WHERE element_type = $1
|
||||
AND version = (
|
||||
SELECT MAX(version)
|
||||
FROM agent_config_versions
|
||||
WHERE element_type=$2)`, typ, typ)
|
||||
ctx context.Context, typ types.ElementTypeDef,
|
||||
) (*types.AgentConfigVersion, *model.ApiError) {
|
||||
var c types.AgentConfigVersion
|
||||
err := r.db.NewSelect().
|
||||
Model(&c).
|
||||
ColumnExpr("id, version, element_type").
|
||||
ColumnExpr("COALESCE(created_by, -1) as created_by").
|
||||
ColumnExpr("created_at").
|
||||
ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE id = v.created_by), 'unknown') as created_by_name`).
|
||||
ColumnExpr("active, is_valid, disabled, deploy_status, deploy_result").
|
||||
TableExpr("agent_config_versions AS v").
|
||||
Where("element_type = ?", typ).
|
||||
Where("version = (SELECT MAX(version) FROM agent_config_versions WHERE element_type = ?)", typ).
|
||||
Scan(ctx)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, model.NotFoundError(err)
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, model.NotFoundError(err)
|
||||
}
|
||||
return nil, model.InternalError(err)
|
||||
}
|
||||
|
||||
@@ -125,7 +104,7 @@ func (r *Repo) GetLatestVersion(
|
||||
}
|
||||
|
||||
func (r *Repo) insertConfig(
|
||||
ctx context.Context, userId string, c *ConfigVersion, elements []string,
|
||||
ctx context.Context, userId string, c *types.AgentConfigVersion, elements []string,
|
||||
) (fnerr *model.ApiError) {
|
||||
|
||||
if string(c.ElementType) == "" {
|
||||
@@ -135,7 +114,7 @@ func (r *Repo) insertConfig(
|
||||
}
|
||||
|
||||
// allowing empty elements for logs - use case is deleting all pipelines
|
||||
if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines {
|
||||
if len(elements) == 0 && c.ElementType != types.ElementTypeLogPipelines {
|
||||
zap.L().Error("insert config called with no elements ", zap.String("ElementType", string(c.ElementType)))
|
||||
return model.BadRequest(fmt.Errorf("config must have atleast one element"))
|
||||
}
|
||||
@@ -157,7 +136,7 @@ func (r *Repo) insertConfig(
|
||||
}
|
||||
|
||||
if configVersion != nil {
|
||||
c.Version = updateVersion(configVersion.Version)
|
||||
c.Version = types.UpdateVersion(configVersion.Version)
|
||||
} else {
|
||||
// first version
|
||||
c.Version = 1
|
||||
@@ -166,57 +145,41 @@ func (r *Repo) insertConfig(
|
||||
defer func() {
|
||||
if fnerr != nil {
|
||||
// remove all the damage (invalid rows from db)
|
||||
r.db.Exec("DELETE FROM agent_config_versions WHERE id = $1", c.ID)
|
||||
r.db.Exec("DELETE FROM agent_config_elements WHERE version_id=$1", c.ID)
|
||||
r.db.NewDelete().Model((*types.AgentConfigVersion)(nil)).Where("id = ?", c.ID).Exec(ctx)
|
||||
r.db.NewDelete().Model((*types.AgentConfigElement)(nil)).Where("version_id = ?", c.ID).Exec(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
// insert config
|
||||
configQuery := `INSERT INTO agent_config_versions(
|
||||
id,
|
||||
version,
|
||||
created_by,
|
||||
element_type,
|
||||
active,
|
||||
is_valid,
|
||||
disabled,
|
||||
deploy_status,
|
||||
deploy_result)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`
|
||||
|
||||
_, dbErr := r.db.ExecContext(ctx,
|
||||
configQuery,
|
||||
c.ID,
|
||||
c.Version,
|
||||
userId,
|
||||
c.ElementType,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
c.DeployStatus,
|
||||
c.DeployResult)
|
||||
_, dbErr := r.db.NewInsert().
|
||||
Model(&types.AgentConfigVersion{
|
||||
ID: c.ID,
|
||||
Version: c.Version,
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: userId,
|
||||
},
|
||||
ElementType: c.ElementType,
|
||||
Active: false, // default value
|
||||
IsValid: false, // default value
|
||||
Disabled: false, // default value
|
||||
DeployStatus: c.DeployStatus,
|
||||
DeployResult: c.DeployResult,
|
||||
}).
|
||||
Exec(ctx)
|
||||
|
||||
if dbErr != nil {
|
||||
zap.L().Error("error in inserting config version: ", zap.Error(dbErr))
|
||||
return model.InternalError(errors.Wrap(dbErr, "failed to insert ingestion rule"))
|
||||
}
|
||||
|
||||
elementsQuery := `INSERT INTO agent_config_elements(
|
||||
id,
|
||||
version_id,
|
||||
element_type,
|
||||
element_id)
|
||||
VALUES ($1, $2, $3, $4)`
|
||||
|
||||
for _, e := range elements {
|
||||
_, dbErr = r.db.ExecContext(
|
||||
ctx,
|
||||
elementsQuery,
|
||||
uuid.NewString(),
|
||||
c.ID,
|
||||
c.ElementType,
|
||||
e,
|
||||
)
|
||||
agentConfigElement := &types.AgentConfigElement{
|
||||
ID: uuid.NewString(),
|
||||
VersionID: c.ID,
|
||||
ElementType: string(c.ElementType),
|
||||
ElementID: e,
|
||||
}
|
||||
_, dbErr = r.db.NewInsert().Model(agentConfigElement).Exec(ctx)
|
||||
if dbErr != nil {
|
||||
return model.InternalError(dbErr)
|
||||
}
|
||||
@@ -226,25 +189,25 @@ func (r *Repo) insertConfig(
|
||||
}
|
||||
|
||||
func (r *Repo) updateDeployStatus(ctx context.Context,
|
||||
elementType ElementTypeDef,
|
||||
elementType types.ElementTypeDef,
|
||||
version int,
|
||||
status string,
|
||||
result string,
|
||||
lastHash string,
|
||||
lastconf string) *model.ApiError {
|
||||
|
||||
updateQuery := `UPDATE agent_config_versions
|
||||
set deploy_status = $1,
|
||||
deploy_result = $2,
|
||||
last_hash = COALESCE($3, last_hash),
|
||||
last_config = $4
|
||||
WHERE version=$5
|
||||
AND element_type = $6`
|
||||
|
||||
_, err := r.db.ExecContext(ctx, updateQuery, status, result, lastHash, lastconf, version, string(elementType))
|
||||
_, err := r.db.NewUpdate().
|
||||
Model((*types.AgentConfigVersion)(nil)).
|
||||
Set("deploy_status = ?", status).
|
||||
Set("deploy_result = ?", result).
|
||||
Set("last_hash = COALESCE(?, last_hash)", lastHash).
|
||||
Set("last_config = ?", lastconf).
|
||||
Where("version = ?", version).
|
||||
Where("element_type = ?", elementType).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to update deploy status", zap.Error(err))
|
||||
return model.BadRequest(fmt.Errorf("failed to update deploy status"))
|
||||
return model.BadRequest(fmt.Errorf("failed to update deploy status"))
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -254,12 +217,12 @@ func (r *Repo) updateDeployStatusByHash(
|
||||
ctx context.Context, confighash string, status string, result string,
|
||||
) *model.ApiError {
|
||||
|
||||
updateQuery := `UPDATE agent_config_versions
|
||||
set deploy_status = $1,
|
||||
deploy_result = $2
|
||||
WHERE last_hash=$4`
|
||||
|
||||
_, err := r.db.ExecContext(ctx, updateQuery, status, result, confighash)
|
||||
_, err := r.db.NewUpdate().
|
||||
Model((*types.AgentConfigVersion)(nil)).
|
||||
Set("deploy_status = ?", status).
|
||||
Set("deploy_result = ?", result).
|
||||
Where("last_hash = ?", confighash).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to update deploy status", zap.Error(err))
|
||||
return model.InternalError(errors.Wrap(err, "failed to update deploy status"))
|
||||
|
||||
@@ -9,12 +9,13 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/uptrace/bun"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/opamp"
|
||||
filterprocessor "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor"
|
||||
tsp "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/types"
|
||||
"go.uber.org/zap"
|
||||
yaml "gopkg.in/yaml.v3"
|
||||
)
|
||||
@@ -39,7 +40,7 @@ type Manager struct {
|
||||
}
|
||||
|
||||
type ManagerOptions struct {
|
||||
DB *sqlx.DB
|
||||
DB *bun.DB
|
||||
|
||||
// When acting as opamp.AgentConfigProvider, agent conf recommendations are
|
||||
// applied to the base conf in the order the features have been specified here.
|
||||
@@ -100,7 +101,7 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
|
||||
settingVersionsUsed := []string{}
|
||||
|
||||
for _, feature := range m.agentFeatures {
|
||||
featureType := ElementTypeDef(feature.AgentFeatureType())
|
||||
featureType := types.ElementTypeDef(feature.AgentFeatureType())
|
||||
latestConfig, apiErr := GetLatestVersion(context.Background(), featureType)
|
||||
if apiErr != nil && apiErr.Type() != model.ErrorNotFound {
|
||||
return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version")
|
||||
@@ -133,7 +134,7 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
|
||||
context.Background(),
|
||||
featureType,
|
||||
configVersion,
|
||||
string(DeployInitiated),
|
||||
string(types.DeployInitiated),
|
||||
"Deployment has started",
|
||||
configId,
|
||||
serializedSettingsUsed,
|
||||
@@ -162,10 +163,10 @@ func (m *Manager) ReportConfigDeploymentStatus(
|
||||
) {
|
||||
featureConfigIds := strings.Split(configId, ",")
|
||||
for _, featureConfId := range featureConfigIds {
|
||||
newStatus := string(Deployed)
|
||||
newStatus := string(types.Deployed)
|
||||
message := "Deployment was successful"
|
||||
if err != nil {
|
||||
newStatus = string(DeployFailed)
|
||||
newStatus = string(types.DeployFailed)
|
||||
message = fmt.Sprintf("%s: %s", agentId, err.Error())
|
||||
}
|
||||
m.updateDeployStatusByHash(
|
||||
@@ -175,30 +176,30 @@ func (m *Manager) ReportConfigDeploymentStatus(
|
||||
}
|
||||
|
||||
func GetLatestVersion(
|
||||
ctx context.Context, elementType ElementTypeDef,
|
||||
) (*ConfigVersion, *model.ApiError) {
|
||||
ctx context.Context, elementType types.ElementTypeDef,
|
||||
) (*types.AgentConfigVersion, *model.ApiError) {
|
||||
return m.GetLatestVersion(ctx, elementType)
|
||||
}
|
||||
|
||||
func GetConfigVersion(
|
||||
ctx context.Context, elementType ElementTypeDef, version int,
|
||||
) (*ConfigVersion, *model.ApiError) {
|
||||
ctx context.Context, elementType types.ElementTypeDef, version int,
|
||||
) (*types.AgentConfigVersion, *model.ApiError) {
|
||||
return m.GetConfigVersion(ctx, elementType, version)
|
||||
}
|
||||
|
||||
func GetConfigHistory(
|
||||
ctx context.Context, typ ElementTypeDef, limit int,
|
||||
) ([]ConfigVersion, *model.ApiError) {
|
||||
ctx context.Context, typ types.ElementTypeDef, limit int,
|
||||
) ([]types.AgentConfigVersion, *model.ApiError) {
|
||||
return m.GetConfigHistory(ctx, typ, limit)
|
||||
}
|
||||
|
||||
// StartNewVersion launches a new config version for given set of elements
|
||||
func StartNewVersion(
|
||||
ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string,
|
||||
) (*ConfigVersion, *model.ApiError) {
|
||||
ctx context.Context, userId string, eleType types.ElementTypeDef, elementIds []string,
|
||||
) (*types.AgentConfigVersion, *model.ApiError) {
|
||||
|
||||
// create a new version
|
||||
cfg := NewConfigVersion(eleType)
|
||||
cfg := types.NewAgentConfigVersion(eleType)
|
||||
|
||||
// insert new config and elements into database
|
||||
err := m.insertConfig(ctx, userId, cfg, elementIds)
|
||||
@@ -215,7 +216,7 @@ func NotifyConfigUpdate(ctx context.Context) {
|
||||
m.notifyConfigUpdateSubscribers()
|
||||
}
|
||||
|
||||
func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiError {
|
||||
func Redeploy(ctx context.Context, typ types.ElementTypeDef, version int) *model.ApiError {
|
||||
|
||||
configVersion, err := GetConfigVersion(ctx, typ, version)
|
||||
if err != nil {
|
||||
@@ -223,14 +224,14 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
|
||||
return model.WrapApiError(err, "failed to fetch details of the config version")
|
||||
}
|
||||
|
||||
if configVersion == nil || (configVersion != nil && configVersion.LastConf == "") {
|
||||
if configVersion == nil || (configVersion != nil && configVersion.LastConfig == "") {
|
||||
zap.L().Debug("config version has no conf yaml", zap.Any("configVersion", configVersion))
|
||||
return model.BadRequest(fmt.Errorf("the config version can not be redeployed"))
|
||||
}
|
||||
switch typ {
|
||||
case ElementTypeSamplingRules:
|
||||
case types.ElementTypeSamplingRules:
|
||||
var config *tsp.Config
|
||||
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &config); err != nil {
|
||||
if err := yaml.Unmarshal([]byte(configVersion.LastConfig), &config); err != nil {
|
||||
zap.L().Debug("failed to read last conf correctly", zap.Error(err))
|
||||
return model.BadRequest(fmt.Errorf("failed to read the stored config correctly"))
|
||||
}
|
||||
@@ -247,10 +248,10 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
|
||||
return model.InternalError(fmt.Errorf("failed to deploy the config"))
|
||||
}
|
||||
|
||||
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
|
||||
case ElementTypeDropRules:
|
||||
m.updateDeployStatus(ctx, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, configVersion.LastConfig)
|
||||
case types.ElementTypeDropRules:
|
||||
var filterConfig *filterprocessor.Config
|
||||
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &filterConfig); err != nil {
|
||||
if err := yaml.Unmarshal([]byte(configVersion.LastConfig), &filterConfig); err != nil {
|
||||
zap.L().Error("failed to read last conf correctly", zap.Error(err))
|
||||
return model.InternalError(fmt.Errorf("failed to read the stored config correctly"))
|
||||
}
|
||||
@@ -265,7 +266,7 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
|
||||
return err
|
||||
}
|
||||
|
||||
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
|
||||
m.updateDeployStatus(ctx, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, configVersion.LastConfig)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -296,7 +297,7 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
|
||||
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
|
||||
}
|
||||
|
||||
m.updateDeployStatus(ctx, ElementTypeDropRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
|
||||
m.updateDeployStatus(ctx, types.ElementTypeDropRules, version, string(types.DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -307,7 +308,7 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
|
||||
// but can be improved in future to accept continuous request status updates from opamp
|
||||
func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) {
|
||||
|
||||
status := string(Deployed)
|
||||
status := string(types.Deployed)
|
||||
|
||||
message := "Deployment was successful"
|
||||
|
||||
@@ -316,7 +317,7 @@ func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) {
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
status = string(DeployFailed)
|
||||
status = string(types.DeployFailed)
|
||||
message = fmt.Sprintf("%s: %s", agentId, err.Error())
|
||||
}
|
||||
|
||||
@@ -347,6 +348,6 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
|
||||
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
|
||||
}
|
||||
|
||||
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
|
||||
m.updateDeployStatus(ctx, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,72 +1,10 @@
|
||||
package agentConf
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type ElementTypeDef string
|
||||
|
||||
const (
|
||||
ElementTypeSamplingRules ElementTypeDef = "sampling_rules"
|
||||
ElementTypeDropRules ElementTypeDef = "drop_rules"
|
||||
ElementTypeLogPipelines ElementTypeDef = "log_pipelines"
|
||||
ElementTypeLbExporter ElementTypeDef = "lb_exporter"
|
||||
)
|
||||
|
||||
type DeployStatus string
|
||||
|
||||
const (
|
||||
PendingDeploy DeployStatus = "DIRTY"
|
||||
Deploying DeployStatus = "DEPLOYING"
|
||||
Deployed DeployStatus = "DEPLOYED"
|
||||
DeployInitiated DeployStatus = "IN_PROGRESS"
|
||||
DeployFailed DeployStatus = "FAILED"
|
||||
DeployStatusUnknown DeployStatus = "UNKNOWN"
|
||||
)
|
||||
|
||||
type ConfigVersion struct {
|
||||
ID string `json:"id" db:"id"`
|
||||
Version int `json:"version" db:"version"`
|
||||
ElementType ElementTypeDef `json:"elementType" db:"element_type"`
|
||||
Active bool `json:"active" db:"active"`
|
||||
IsValid bool `json:"is_valid" db:"is_valid"`
|
||||
Disabled bool `json:"disabled" db:"disabled"`
|
||||
|
||||
DeployStatus DeployStatus `json:"deployStatus" db:"deploy_status"`
|
||||
DeployResult string `json:"deployResult" db:"deploy_result"`
|
||||
|
||||
LastHash string `json:"lastHash" db:"last_hash"`
|
||||
LastConf string `json:"lastConf" db:"last_config"`
|
||||
|
||||
CreatedBy string `json:"createdBy" db:"created_by"`
|
||||
CreatedByName string `json:"createdByName" db:"created_by_name"`
|
||||
CreatedAt time.Time `json:"createdAt" db:"created_at"`
|
||||
}
|
||||
|
||||
func NewConfigVersion(typeDef ElementTypeDef) *ConfigVersion {
|
||||
return &ConfigVersion{
|
||||
ID: uuid.NewString(),
|
||||
ElementType: typeDef,
|
||||
Active: false,
|
||||
IsValid: false,
|
||||
Disabled: false,
|
||||
DeployStatus: PendingDeploy,
|
||||
LastHash: "",
|
||||
LastConf: "{}",
|
||||
// todo: get user id from context?
|
||||
// CreatedBy
|
||||
}
|
||||
}
|
||||
|
||||
func updateVersion(v int) int {
|
||||
return v + 1
|
||||
}
|
||||
import "go.signoz.io/signoz/pkg/types"
|
||||
|
||||
type ConfigElements struct {
|
||||
VersionID string
|
||||
Version int
|
||||
ElementType ElementTypeDef
|
||||
ElementType types.ElementTypeDef
|
||||
ElementId string
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
"go.signoz.io/signoz/pkg/types"
|
||||
"go.signoz.io/signoz/pkg/types/authtypes"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -38,10 +39,10 @@ func NewLogParsingPipelinesController(
|
||||
|
||||
// PipelinesResponse is used to prepare http response for pipelines config related requests
|
||||
type PipelinesResponse struct {
|
||||
*agentConf.ConfigVersion
|
||||
*types.AgentConfigVersion
|
||||
|
||||
Pipelines []Pipeline `json:"pipelines"`
|
||||
History []agentConf.ConfigVersion `json:"history"`
|
||||
Pipelines []Pipeline `json:"pipelines"`
|
||||
History []types.AgentConfigVersion `json:"history"`
|
||||
}
|
||||
|
||||
// ApplyPipelines stores new or changed pipelines and initiates a new config update
|
||||
@@ -84,7 +85,7 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
|
||||
}
|
||||
|
||||
// prepare config by calling gen func
|
||||
cfg, err := agentConf.StartNewVersion(ctx, claims.UserID, agentConf.ElementTypeLogPipelines, elements)
|
||||
cfg, err := agentConf.StartNewVersion(ctx, claims.UserID, types.ElementTypeLogPipelines, elements)
|
||||
if err != nil || cfg == nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -199,9 +200,9 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion(
|
||||
return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version"))
|
||||
}
|
||||
|
||||
var configVersion *agentConf.ConfigVersion
|
||||
var configVersion *types.AgentConfigVersion
|
||||
if version >= 0 {
|
||||
cv, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version)
|
||||
cv, err := agentConf.GetConfigVersion(ctx, types.ElementTypeLogPipelines, version)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to get config for version", zap.Int("version", version), zap.Error(err))
|
||||
return nil, model.WrapApiError(err, "failed to get config for given version")
|
||||
@@ -210,8 +211,8 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion(
|
||||
}
|
||||
|
||||
return &PipelinesResponse{
|
||||
ConfigVersion: configVersion,
|
||||
Pipelines: pipelines,
|
||||
AgentConfigVersion: configVersion,
|
||||
Pipelines: pipelines,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -251,7 +252,7 @@ func (pc *LogParsingPipelineController) AgentFeatureType() agentConf.AgentFeatur
|
||||
// Implements agentConf.AgentFeature interface.
|
||||
func (pc *LogParsingPipelineController) RecommendAgentConfig(
|
||||
currentConfYaml []byte,
|
||||
configVersion *agentConf.ConfigVersion,
|
||||
configVersion *types.AgentConfigVersion,
|
||||
) (
|
||||
recommendedConfYaml []byte,
|
||||
serializedSettingsUsed string,
|
||||
|
||||
@@ -166,7 +166,7 @@ type testbed struct {
|
||||
|
||||
func newTestbed(t *testing.T) *testbed {
|
||||
testDB := utils.NewQueryServiceDBForTests(t)
|
||||
_, err := model.InitDB(testDB.SQLxDB())
|
||||
_, err := model.InitDB(testDB.BunDB())
|
||||
if err != nil {
|
||||
t.Fatalf("could not init opamp model: %v", err)
|
||||
}
|
||||
|
||||
@@ -60,13 +60,13 @@ func UpsertControlProcessors(
|
||||
|
||||
agenthash, err := addIngestionControlToAgent(agent, signal, processors, false)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.ID), zap.Error(err))
|
||||
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.AgentID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
if agenthash != "" {
|
||||
// subscribe callback
|
||||
model.ListenToConfigUpdate(agent.ID, agenthash, callback)
|
||||
model.ListenToConfigUpdate(agent.AgentID, agenthash, callback)
|
||||
}
|
||||
|
||||
hash = agenthash
|
||||
@@ -89,7 +89,7 @@ func addIngestionControlToAgent(agent *model.Agent, signal string, processors ma
|
||||
// add ingestion control spec
|
||||
err = makeIngestionControlSpec(agentConf, Signal(signal), processors)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.ID), zap.Error(err))
|
||||
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
|
||||
return confHash, err
|
||||
}
|
||||
|
||||
|
||||
@@ -7,33 +7,18 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/types"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/open-telemetry/opamp-go/protobufs"
|
||||
"github.com/open-telemetry/opamp-go/server/types"
|
||||
opampTypes "github.com/open-telemetry/opamp-go/server/types"
|
||||
)
|
||||
|
||||
type AgentStatus int
|
||||
|
||||
const (
|
||||
AgentStatusUnknown AgentStatus = iota
|
||||
AgentStatusConnected
|
||||
AgentStatusDisconnected
|
||||
)
|
||||
|
||||
// set in agent description when agent is capable of supporting
|
||||
// lb exporter configuration. values: 1 (true) or 0 (false)
|
||||
const lbExporterFlag = "capabilities.lbexporter"
|
||||
|
||||
type Agent struct {
|
||||
ID string `json:"agentId" yaml:"agentId" db:"agent_id"`
|
||||
StartedAt time.Time `json:"startedAt" yaml:"startedAt" db:"started_at"`
|
||||
TerminatedAt time.Time `json:"terminatedAt" yaml:"terminatedAt" db:"terminated_at"`
|
||||
EffectiveConfig string `json:"effectiveConfig" yaml:"effectiveConfig" db:"effective_config"`
|
||||
CurrentStatus AgentStatus `json:"currentStatus" yaml:"currentStatus" db:"current_status"`
|
||||
remoteConfig *protobufs.AgentRemoteConfig
|
||||
Status *protobufs.AgentToServer
|
||||
types.StorableAgent
|
||||
remoteConfig *protobufs.AgentRemoteConfig
|
||||
Status *protobufs.AgentToServer
|
||||
|
||||
// can this agent be load balancer
|
||||
CanLB bool
|
||||
@@ -41,13 +26,17 @@ type Agent struct {
|
||||
// is this agent setup as load balancer
|
||||
IsLb bool
|
||||
|
||||
conn types.Connection
|
||||
conn opampTypes.Connection
|
||||
connMutex sync.Mutex
|
||||
mux sync.RWMutex
|
||||
}
|
||||
|
||||
func New(ID string, conn types.Connection) *Agent {
|
||||
return &Agent{ID: ID, StartedAt: time.Now(), CurrentStatus: AgentStatusConnected, conn: conn}
|
||||
// set in agent description when agent is capable of supporting
|
||||
// lb exporter configuration. values: 1 (true) or 0 (false)
|
||||
const lbExporterFlag = "capabilities.lbexporter"
|
||||
|
||||
func New(orgID string, ID string, conn opampTypes.Connection) *Agent {
|
||||
return &Agent{StorableAgent: types.StorableAgent{OrgID: orgID, AgentID: ID, StartedAt: time.Now(), CurrentStatus: types.AgentStatusConnected}, conn: conn}
|
||||
}
|
||||
|
||||
// Upsert inserts or updates the agent in the database.
|
||||
@@ -55,17 +44,13 @@ func (agent *Agent) Upsert() error {
|
||||
agent.mux.Lock()
|
||||
defer agent.mux.Unlock()
|
||||
|
||||
_, err := db.NamedExec(`INSERT OR REPLACE INTO agents (
|
||||
agent_id,
|
||||
started_at,
|
||||
effective_config,
|
||||
current_status
|
||||
) VALUES (
|
||||
:agent_id,
|
||||
:started_at,
|
||||
:effective_config,
|
||||
:current_status
|
||||
)`, agent)
|
||||
_, err := db.NewInsert().
|
||||
Model(&agent.StorableAgent).
|
||||
On("CONFLICT (org_id, agent_id) DO UPDATE").
|
||||
Set("started_at = EXCLUDED.started_at").
|
||||
Set("effective_config = EXCLUDED.effective_config").
|
||||
Set("current_status = EXCLUDED.current_status").
|
||||
Exec(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -135,11 +120,11 @@ func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
|
||||
onConfigSuccess(agent.ID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
|
||||
onConfigSuccess(agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
|
||||
}
|
||||
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
|
||||
onConfigFailure(agent.ID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
|
||||
onConfigFailure(agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -266,7 +251,7 @@ func (agent *Agent) processStatusUpdate(
|
||||
agent.SendToAgent(response)
|
||||
|
||||
ListenToConfigUpdate(
|
||||
agent.ID,
|
||||
agent.AgentID,
|
||||
string(response.RemoteConfig.ConfigHash),
|
||||
configProvider.ReportConfigDeploymentStatus,
|
||||
)
|
||||
@@ -276,7 +261,7 @@ func (agent *Agent) processStatusUpdate(
|
||||
func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool {
|
||||
recommendedConfig, confId, err := configProvider.RecommendAgentConfig([]byte(agent.EffectiveConfig))
|
||||
if err != nil {
|
||||
zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.ID), zap.Error(err))
|
||||
zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -1,18 +1,20 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/open-telemetry/opamp-go/protobufs"
|
||||
"github.com/open-telemetry/opamp-go/server/types"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/uptrace/bun"
|
||||
signozTypes "go.signoz.io/signoz/pkg/types"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var db *sqlx.DB
|
||||
var db *bun.DB
|
||||
|
||||
var AllAgents = Agents{
|
||||
agentsById: map[string]*Agent{},
|
||||
@@ -30,7 +32,7 @@ func (a *Agents) Count() int {
|
||||
}
|
||||
|
||||
// Initialize the database and create schema if needed
|
||||
func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) {
|
||||
func InitDB(qsDB *bun.DB) (*bun.DB, error) {
|
||||
db = qsDB
|
||||
|
||||
AllAgents = Agents{
|
||||
@@ -49,7 +51,7 @@ func (agents *Agents) RemoveConnection(conn types.Connection) {
|
||||
|
||||
for instanceId := range agents.connections[conn] {
|
||||
agent := agents.agentsById[instanceId]
|
||||
agent.CurrentStatus = AgentStatusDisconnected
|
||||
agent.CurrentStatus = signozTypes.AgentStatusDisconnected
|
||||
agent.TerminatedAt = time.Now()
|
||||
agent.Upsert()
|
||||
delete(agents.agentsById, instanceId)
|
||||
@@ -64,6 +66,24 @@ func (agents *Agents) FindAgent(agentID string) *Agent {
|
||||
return agents.agentsById[agentID]
|
||||
}
|
||||
|
||||
func (agents *Agents) getDefaultOrgID() (string, error) {
|
||||
var orgID []string
|
||||
err := db.NewSelect().Model((*signozTypes.Organization)(nil)).Column("id").Scan(context.Background(), &orgID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(orgID) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
if len(orgID) != 1 {
|
||||
return "", fmt.Errorf("expected exactly one organization, but found %d", len(orgID))
|
||||
}
|
||||
|
||||
return orgID[0], nil
|
||||
}
|
||||
|
||||
// FindOrCreateAgent returns the Agent instance associated with the given agentID.
|
||||
// If the Agent instance does not exist, it is created and added to the list of
|
||||
// Agent instances.
|
||||
@@ -72,9 +92,12 @@ func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection) (
|
||||
defer agents.mux.Unlock()
|
||||
var created bool
|
||||
agent, ok := agents.agentsById[agentID]
|
||||
var err error
|
||||
if !ok || agent == nil {
|
||||
agent = New(agentID, conn)
|
||||
orgID, err := agents.getDefaultOrgID()
|
||||
if err != nil {
|
||||
return nil, created, err
|
||||
}
|
||||
agent = New(orgID, agentID, conn)
|
||||
err = agent.Upsert()
|
||||
if err != nil {
|
||||
return nil, created, err
|
||||
@@ -112,14 +135,14 @@ func (agents *Agents) RecommendLatestConfigToAll(
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf(
|
||||
"could not generate conf recommendation for %v", agent.ID,
|
||||
"could not generate conf recommendation for %v", agent.AgentID,
|
||||
))
|
||||
}
|
||||
|
||||
// Recommendation is same as current config
|
||||
if string(newConfig) == agent.EffectiveConfig {
|
||||
zap.L().Info(
|
||||
"Recommended config same as current effective config for agent", zap.String("agentID", agent.ID),
|
||||
"Recommended config same as current effective config for agent", zap.String("agentID", agent.AgentID),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
@@ -144,7 +167,7 @@ func (agents *Agents) RecommendLatestConfigToAll(
|
||||
RemoteConfig: newRemoteConfig,
|
||||
})
|
||||
|
||||
ListenToConfigUpdate(agent.ID, confId, provider.ReportConfigDeploymentStatus)
|
||||
ListenToConfigUpdate(agent.AgentID, confId, provider.ReportConfigDeploymentStatus)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer
|
||||
agent.CanLB = model.ExtractLbFlag(msg.AgentDescription)
|
||||
zap.L().Debug(
|
||||
"New agent added", zap.Bool("canLb", agent.CanLB),
|
||||
zap.String("ID", agent.ID),
|
||||
zap.String("ID", agent.AgentID),
|
||||
zap.Any("status", agent.CurrentStatus),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -235,13 +235,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
|
||||
s.privateHTTP = privateServer
|
||||
|
||||
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB())
|
||||
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.BunDB())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
|
||||
DB: serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||
DB: serverOptions.SigNoz.SQLStore.BunDB(),
|
||||
AgentFeatures: []agentConf.AgentFeature{
|
||||
logParsingPipelineController,
|
||||
},
|
||||
|
||||
@@ -20,7 +20,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/opamp"
|
||||
opampModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
@@ -120,7 +119,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
"pipelines config history should not be empty after 1st configuration",
|
||||
)
|
||||
require.Equal(
|
||||
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
|
||||
types.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
|
||||
"pipelines deployment should be in progress after 1st configuration",
|
||||
)
|
||||
|
||||
@@ -132,7 +131,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
t, postablePipelines, getPipelinesResp,
|
||||
)
|
||||
require.Equal(
|
||||
agentConf.Deployed,
|
||||
types.Deployed,
|
||||
getPipelinesResp.History[0].DeployStatus,
|
||||
"pipeline deployment should be complete after acknowledgment from opamp client",
|
||||
)
|
||||
@@ -152,7 +151,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
"there should be 2 history entries after posting pipelines config for the 2nd time",
|
||||
)
|
||||
require.Equal(
|
||||
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
|
||||
types.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
|
||||
"deployment should be in progress for latest pipeline config",
|
||||
)
|
||||
|
||||
@@ -164,7 +163,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
t, postablePipelines, getPipelinesResp,
|
||||
)
|
||||
require.Equal(
|
||||
agentConf.Deployed,
|
||||
types.Deployed,
|
||||
getPipelinesResp.History[0].DeployStatus,
|
||||
"deployment for latest pipeline config should be complete after acknowledgment from opamp client",
|
||||
)
|
||||
@@ -218,7 +217,7 @@ func TestLogPipelinesHistory(t *testing.T) {
|
||||
testbed.PostPipelinesToQS(postablePipelines)
|
||||
getPipelinesResp = testbed.GetPipelinesFromQS()
|
||||
require.Equal(1, len(getPipelinesResp.History))
|
||||
require.Equal(agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
|
||||
require.Equal(types.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
|
||||
|
||||
postablePipelines.Pipelines[0].Config = append(
|
||||
postablePipelines.Pipelines[0].Config,
|
||||
@@ -237,8 +236,8 @@ func TestLogPipelinesHistory(t *testing.T) {
|
||||
getPipelinesResp = testbed.GetPipelinesFromQS()
|
||||
|
||||
require.Equal(2, len(getPipelinesResp.History))
|
||||
require.Equal(agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
|
||||
require.Equal(agentConf.DeployStatusUnknown, getPipelinesResp.History[1].DeployStatus)
|
||||
require.Equal(types.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
|
||||
require.Equal(types.DeployStatusUnknown, getPipelinesResp.History[1].DeployStatus)
|
||||
}
|
||||
|
||||
func TestLogPipelinesValidation(t *testing.T) {
|
||||
@@ -481,11 +480,11 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
|
||||
}
|
||||
|
||||
// Mock an available opamp agent
|
||||
testDB, err := opampModel.InitDB(sqlStore.SQLxDB())
|
||||
// testDB, err := opampModel.InitDB(sqlStore.SQLxDB())
|
||||
require.Nil(t, err, "failed to init opamp model")
|
||||
|
||||
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
|
||||
DB: testDB,
|
||||
DB: sqlStore.BunDB(), // todo(nitya): fix this
|
||||
AgentFeatures: []agentConf.AgentFeature{
|
||||
apiHandler.LogsParsingPipelineController,
|
||||
}})
|
||||
|
||||
@@ -48,6 +48,8 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s
|
||||
sqlmigration.NewModifyDatetimeFactory(),
|
||||
sqlmigration.NewModifyOrgDomainFactory(),
|
||||
sqlmigration.NewUpdateOrganizationFactory(sqlStore),
|
||||
sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlStore),
|
||||
sqlmigration.NewUpdateAgentsFactory(sqlStore),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
@@ -59,6 +59,8 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
|
||||
sqlmigration.NewModifyOrgDomainFactory(),
|
||||
sqlmigration.NewUpdateOrganizationFactory(sqlstore),
|
||||
sqlmigration.NewAddAlertmanagerFactory(),
|
||||
sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlstore),
|
||||
sqlmigration.NewUpdateAgentsFactory(sqlstore),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
80
pkg/sqlmigration/015_update_dashboards_savedviews.go
Normal file
80
pkg/sqlmigration/015_update_dashboards_savedviews.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types"
|
||||
)
|
||||
|
||||
type updateDashboardAndSavedViews struct {
|
||||
store sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewUpdateDashboardAndSavedViewsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("update_dashboards_savedviews"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
|
||||
return newUpdateDashboardAndSavedViews(ctx, ps, c, sqlstore)
|
||||
})
|
||||
}
|
||||
|
||||
func newUpdateDashboardAndSavedViews(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
|
||||
return &updateDashboardAndSavedViews{
|
||||
store: store,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (migration *updateDashboardAndSavedViews) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *updateDashboardAndSavedViews) Up(ctx context.Context, db *bun.DB) error {
|
||||
|
||||
// begin transaction
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// get all org ids
|
||||
var orgIDs []string
|
||||
if err := migration.store.BunDB().NewSelect().Model((*types.Organization)(nil)).Column("id").Scan(ctx, &orgIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// add org id to dashboards table
|
||||
for _, table := range []string{"dashboards", "saved_views"} {
|
||||
if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, table, "org_id"); err != nil {
|
||||
return err
|
||||
} else if !exists {
|
||||
if _, err := tx.NewAddColumn().Table(table).ColumnExpr("org_id TEXT REFERENCES organizations(id) ON DELETE CASCADE").Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if there is one org ID if yes then set it to all dashboards.
|
||||
if len(orgIDs) == 1 {
|
||||
orgID := orgIDs[0]
|
||||
if _, err := tx.NewUpdate().Table(table).Set("org_id = ?", orgID).Where("org_id IS NULL").Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *updateDashboardAndSavedViews) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
80
pkg/sqlmigration/016_update_agents.go
Normal file
80
pkg/sqlmigration/016_update_agents.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types"
|
||||
)
|
||||
|
||||
type updateAgents struct {
|
||||
store sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewUpdateAgentsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("update_agents"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
|
||||
return newUpdateAgents(ctx, ps, c, sqlstore)
|
||||
})
|
||||
}
|
||||
|
||||
func newUpdateAgents(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
|
||||
return &updateAgents{
|
||||
store: store,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (migration *updateAgents) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *updateAgents) Up(ctx context.Context, db *bun.DB) error {
|
||||
|
||||
// begin transaction
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// get all org ids
|
||||
var orgIDs []string
|
||||
if err := migration.store.BunDB().NewSelect().Model((*types.Organization)(nil)).Column("id").Scan(ctx, &orgIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// add org id to dashboards table
|
||||
for _, table := range []string{"agents", "agent_config_versions", "agent_config_elements"} {
|
||||
if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, table, "org_id"); err != nil {
|
||||
return err
|
||||
} else if !exists {
|
||||
if _, err := tx.NewAddColumn().Table(table).ColumnExpr("org_id TEXT REFERENCES organizations(id) ON DELETE CASCADE").Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if there is one org ID if yes then set it to all dashboards.
|
||||
if len(orgIDs) == 1 {
|
||||
orgID := orgIDs[0]
|
||||
if _, err := tx.NewUpdate().Table(table).Set("org_id = ?", orgID).Where("org_id IS NULL").Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *updateAgents) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -3,41 +3,92 @@ package types
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
type Agent struct {
|
||||
bun.BaseModel `bun:"table:agents"`
|
||||
AgentID string `bun:"agent_id,pk,type:text"`
|
||||
StartedAt time.Time `bun:"started_at,type:datetime,notnull"`
|
||||
TerminatedAt time.Time `bun:"terminated_at,type:datetime"`
|
||||
CurrentStatus string `bun:"current_status,type:text,notnull"`
|
||||
EffectiveConfig string `bun:"effective_config,type:text,notnull"`
|
||||
type AgentStatus int
|
||||
|
||||
const (
|
||||
AgentStatusUnknown AgentStatus = iota
|
||||
AgentStatusConnected
|
||||
AgentStatusDisconnected
|
||||
)
|
||||
|
||||
type StorableAgent struct {
|
||||
bun.BaseModel `bun:"table:agents"`
|
||||
|
||||
OrgID string `json:"orgId" yaml:"orgId" bun:"org_id,type:text"`
|
||||
AgentID string `json:"agentId" yaml:"agentId" bun:"agent_id,pk,type:text"`
|
||||
StartedAt time.Time `json:"startedAt" yaml:"startedAt" bun:"started_at,type:datetime,notnull"`
|
||||
TerminatedAt time.Time `json:"terminatedAt" yaml:"terminatedAt" bun:"terminated_at,type:datetime"`
|
||||
CurrentStatus AgentStatus `json:"currentStatus" yaml:"currentStatus" bun:"current_status,type:text,notnull"`
|
||||
EffectiveConfig string `bun:"effective_config,type:text,notnull"`
|
||||
}
|
||||
|
||||
type ElementTypeDef string
|
||||
|
||||
const (
|
||||
ElementTypeSamplingRules ElementTypeDef = "sampling_rules"
|
||||
ElementTypeDropRules ElementTypeDef = "drop_rules"
|
||||
ElementTypeLogPipelines ElementTypeDef = "log_pipelines"
|
||||
ElementTypeLbExporter ElementTypeDef = "lb_exporter"
|
||||
)
|
||||
|
||||
type DeployStatus string
|
||||
|
||||
const (
|
||||
PendingDeploy DeployStatus = "DIRTY"
|
||||
Deploying DeployStatus = "DEPLOYING"
|
||||
Deployed DeployStatus = "DEPLOYED"
|
||||
DeployInitiated DeployStatus = "IN_PROGRESS"
|
||||
DeployFailed DeployStatus = "FAILED"
|
||||
DeployStatusUnknown DeployStatus = "UNKNOWN"
|
||||
)
|
||||
|
||||
type AgentConfigVersion struct {
|
||||
bun.BaseModel `bun:"table:agent_config_versions"`
|
||||
|
||||
ID string `bun:"id,pk,type:text"`
|
||||
CreatedBy string `bun:"created_by,type:text"`
|
||||
CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"`
|
||||
UpdatedBy string `bun:"updated_by,type:text"`
|
||||
UpdatedAt time.Time `bun:"updated_at,default:CURRENT_TIMESTAMP"`
|
||||
Version int `bun:"version,default:1,unique:element_version_idx"`
|
||||
Active int `bun:"active"`
|
||||
IsValid int `bun:"is_valid"`
|
||||
Disabled int `bun:"disabled"`
|
||||
ElementType string `bun:"element_type,notnull,type:varchar(120),unique:element_version_idx"`
|
||||
DeployStatus string `bun:"deploy_status,notnull,type:varchar(80),default:'DIRTY'"`
|
||||
DeploySequence int `bun:"deploy_sequence"`
|
||||
DeployResult string `bun:"deploy_result,type:text"`
|
||||
LastHash string `bun:"last_hash,type:text"`
|
||||
LastConfig string `bun:"last_config,type:text"`
|
||||
TimeAuditable
|
||||
UserAuditable
|
||||
|
||||
CreatedByName string `json:"createdByName" db:"created_by_name"`
|
||||
|
||||
OrgID string `json:"orgId" bun:"org_id,type:text"`
|
||||
ID string `json:"id" bun:"id,pk,type:text"`
|
||||
Version int `json:"version" bun:"version,default:1,unique:element_version_idx"`
|
||||
Active bool `json:"active" bun:"active"`
|
||||
IsValid bool `json:"isValid" bun:"is_valid"`
|
||||
Disabled bool `json:"disabled" bun:"disabled"`
|
||||
ElementType ElementTypeDef `json:"elementType" bun:"element_type,notnull,type:varchar(120),unique:element_version_idx"`
|
||||
DeployStatus DeployStatus `json:"deployStatus" bun:"deploy_status,notnull,type:varchar(80),default:'DIRTY'"`
|
||||
DeploySequence int `json:"deploySequence" bun:"deploy_sequence"`
|
||||
DeployResult string `json:"deployResult" bun:"deploy_result,type:text"`
|
||||
LastHash string `json:"lastHash" bun:"last_hash,type:text"`
|
||||
LastConfig string `json:"lastConfig" bun:"last_config,type:text"`
|
||||
}
|
||||
|
||||
func NewAgentConfigVersion(typeDef ElementTypeDef) *AgentConfigVersion {
|
||||
return &AgentConfigVersion{
|
||||
ID: uuid.NewString(),
|
||||
ElementType: typeDef,
|
||||
Active: false,
|
||||
IsValid: false,
|
||||
Disabled: false,
|
||||
DeployStatus: PendingDeploy,
|
||||
LastHash: "",
|
||||
LastConfig: "{}",
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateVersion(v int) int {
|
||||
return v + 1
|
||||
}
|
||||
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_elements"`
|
||||
|
||||
OrgID string `bun:"org_id,type:text"`
|
||||
ID string `bun:"id,pk,type:text"`
|
||||
CreatedBy string `bun:"created_by,type:text"`
|
||||
CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"`
|
||||
|
||||
Reference in New Issue
Block a user