mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-24 17:23:19 +00:00
Compare commits
2 Commits
feat/skill
...
remove-v4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5ad672aa63 | ||
|
|
5be4c5d536 |
@@ -169,6 +169,7 @@ func NewSQLMigrationProviderFactories(
|
||||
sqlmigration.NewAddAnonymousPublicDashboardTransactionFactory(sqlstore),
|
||||
sqlmigration.NewAddRootUserFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewAddUserEmailOrgIDIndexFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewMigrateRulesV4ToV5Factory(sqlstore, telemetryStore),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
209
pkg/sqlmigration/066_migrate_rules_v4_to_v5_post_deprecation.go
Normal file
209
pkg/sqlmigration/066_migrate_rules_v4_to_v5_post_deprecation.go
Normal file
@@ -0,0 +1,209 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/transition"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
type migrateRulesV4ToV5 struct {
|
||||
store sqlstore.SQLStore
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func NewMigrateRulesV4ToV5Factory(
|
||||
store sqlstore.SQLStore,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName("migrate_rules_post_deprecation"),
|
||||
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
|
||||
return &migrateRulesV4ToV5{
|
||||
store: store,
|
||||
telemetryStore: telemetryStore,
|
||||
logger: ps.Logger,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) getLogDuplicateKeys(ctx context.Context) ([]string, error) {
|
||||
query := `
|
||||
SELECT name
|
||||
FROM (
|
||||
SELECT DISTINCT name FROM signoz_logs.distributed_logs_attribute_keys
|
||||
INTERSECT
|
||||
SELECT DISTINCT name FROM signoz_logs.distributed_logs_resource_keys
|
||||
)
|
||||
ORDER BY name
|
||||
`
|
||||
|
||||
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
|
||||
if err != nil {
|
||||
migration.logger.WarnContext(ctx, "failed to query log duplicate keys", "error", err)
|
||||
return nil, nil
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var keys []string
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err != nil {
|
||||
migration.logger.WarnContext(ctx, "failed to scan log duplicate key", "error", err)
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) getTraceDuplicateKeys(ctx context.Context) ([]string, error) {
|
||||
query := `
|
||||
SELECT tagKey
|
||||
FROM signoz_traces.distributed_span_attributes_keys
|
||||
WHERE tagType IN ('tag', 'resource')
|
||||
GROUP BY tagKey
|
||||
HAVING COUNT(DISTINCT tagType) > 1
|
||||
ORDER BY tagKey
|
||||
`
|
||||
|
||||
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
|
||||
if err != nil {
|
||||
migration.logger.WarnContext(ctx, "failed to query trace duplicate keys", "error", err)
|
||||
return nil, nil
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var keys []string
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err != nil {
|
||||
migration.logger.WarnContext(ctx, "failed to scan trace duplicate key", "error", err)
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) Up(ctx context.Context, db *bun.DB) error {
|
||||
logsKeys, err := migration.getLogDuplicateKeys(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tracesKeys, err := migration.getTraceDuplicateKeys(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
var rules []struct {
|
||||
ID string `bun:"id"`
|
||||
Data map[string]any `bun:"data"`
|
||||
}
|
||||
|
||||
err = tx.NewSelect().
|
||||
Table("rule").
|
||||
Column("id", "data").
|
||||
Scan(ctx, &rules)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
alertsMigrator := transition.NewAlertMigrateV5(migration.logger, logsKeys, tracesKeys)
|
||||
|
||||
count := 0
|
||||
|
||||
for _, rule := range rules {
|
||||
version, _ := rule.Data["version"].(string)
|
||||
|
||||
if version == "v5" {
|
||||
continue
|
||||
}
|
||||
|
||||
if version == "" {
|
||||
migration.logger.WarnContext(ctx, "unexpected empty version for rule", "rule_id", rule.ID)
|
||||
}
|
||||
|
||||
migration.logger.InfoContext(ctx, "migrating rule v4 to v5", "rule_id", rule.ID, "current_version", version)
|
||||
|
||||
// Check if the queries envelope already exists and is non-empty
|
||||
hasQueriesEnvelope := false
|
||||
if condition, ok := rule.Data["condition"].(map[string]any); ok {
|
||||
if compositeQuery, ok := condition["compositeQuery"].(map[string]any); ok {
|
||||
if queries, ok := compositeQuery["queries"].([]any); ok && len(queries) > 0 {
|
||||
hasQueriesEnvelope = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if hasQueriesEnvelope {
|
||||
// already has queries envelope, just bump version
|
||||
// this is because user made a mistake of choosing version
|
||||
migration.logger.InfoContext(ctx, "rule already has queries envelope, bumping version", "rule_id", rule.ID)
|
||||
rule.Data["version"] = "v5"
|
||||
} else {
|
||||
// old format, run full migration
|
||||
migration.logger.InfoContext(ctx, "rule has old format, running full migration", "rule_id", rule.ID)
|
||||
updated := alertsMigrator.Migrate(ctx, rule.Data)
|
||||
if !updated {
|
||||
migration.logger.WarnContext(ctx, "expected updated to be true but got false", "rule_id", rule.ID)
|
||||
continue
|
||||
}
|
||||
rule.Data["version"] = "v5"
|
||||
}
|
||||
|
||||
dataJSON, err := json.Marshal(rule.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.NewUpdate().
|
||||
Table("rule").
|
||||
Set("data = ?", string(dataJSON)).
|
||||
Where("id = ?", rule.ID).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
}
|
||||
if count != 0 {
|
||||
migration.logger.InfoContext(ctx, "migrate v4 alerts", "count", count)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -355,6 +355,10 @@ func (r *PostableRule) validate() error {
|
||||
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required"))
|
||||
}
|
||||
|
||||
if r.Version != "v5" {
|
||||
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
|
||||
}
|
||||
|
||||
if isAllQueriesDisabled(r.RuleCondition.CompositeQuery) {
|
||||
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition"))
|
||||
}
|
||||
|
||||
@@ -108,6 +108,7 @@ func TestParseIntoRule(t *testing.T) {
|
||||
"ruleType": "threshold_rule",
|
||||
"evalWindow": "5m",
|
||||
"frequency": "1m",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -150,6 +151,7 @@ func TestParseIntoRule(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "DefaultsRule",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -187,6 +189,7 @@ func TestParseIntoRule(t *testing.T) {
|
||||
initRule: PostableRule{},
|
||||
content: []byte(`{
|
||||
"alert": "PromQLRule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "promql",
|
||||
@@ -256,6 +259,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "SeverityLabelTest",
|
||||
"schemaVersion": "v1",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -344,6 +348,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "NoLabelsTest",
|
||||
"schemaVersion": "v1",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -384,6 +389,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "OverwriteTest",
|
||||
"schemaVersion": "v1",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -474,6 +480,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "V2Test",
|
||||
"schemaVersion": "v2",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -517,6 +524,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
initRule: PostableRule{},
|
||||
content: []byte(`{
|
||||
"alert": "DefaultSchemaTest",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -569,6 +577,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
func TestParseIntoRuleThresholdGeneration(t *testing.T) {
|
||||
content := []byte(`{
|
||||
"alert": "TestThresholds",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -639,6 +648,7 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
|
||||
"schemaVersion": "v2",
|
||||
"alert": "MultiThresholdAlert",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -732,6 +742,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyBelowTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -766,6 +777,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyBelowTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -799,6 +811,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyAboveTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -833,6 +846,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyAboveTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -866,6 +880,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyBelowAllTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -901,6 +916,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyBelowAllTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -935,6 +951,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyOutOfBoundsTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -969,6 +986,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "ThresholdTest",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -1003,6 +1021,7 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "ThresholdTest",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
|
||||
Reference in New Issue
Block a user