mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-25 01:33:20 +00:00
Compare commits
1 Commits
move-pkg
...
ns/waterfa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6fed7df014 |
@@ -26,7 +26,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/formatter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/formatter"
|
||||
|
||||
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
|
||||
|
||||
|
||||
@@ -308,15 +308,3 @@ export const PublicDashboardPage = Loadable(
|
||||
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
|
||||
),
|
||||
);
|
||||
|
||||
export const AlertTypeSelectionPage = Loadable(
|
||||
() =>
|
||||
import(
|
||||
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
|
||||
),
|
||||
);
|
||||
|
||||
export const MeterExplorerPage = Loadable(
|
||||
() =>
|
||||
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
|
||||
);
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import { RouteProps } from 'react-router-dom';
|
||||
import ROUTES from 'constants/routes';
|
||||
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
|
||||
import MessagingQueues from 'pages/MessagingQueues';
|
||||
import MeterExplorer from 'pages/MeterExplorer';
|
||||
|
||||
import {
|
||||
AlertHistory,
|
||||
AlertOverview,
|
||||
AlertTypeSelectionPage,
|
||||
AllAlertChannels,
|
||||
AllErrors,
|
||||
ApiMonitoring,
|
||||
@@ -27,8 +29,6 @@ import {
|
||||
LogsExplorer,
|
||||
LogsIndexToFields,
|
||||
LogsSaveViews,
|
||||
MessagingQueuesMainPage,
|
||||
MeterExplorerPage,
|
||||
MetricsExplorer,
|
||||
OldLogsExplorer,
|
||||
Onboarding,
|
||||
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_KAFKA,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_KAFKA',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_CELERY_TASK',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_OVERVIEW',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
|
||||
exact: true,
|
||||
component: MessagingQueuesMainPage,
|
||||
component: MessagingQueues,
|
||||
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
|
||||
isPrivate: true,
|
||||
},
|
||||
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
|
||||
{
|
||||
path: ROUTES.METER,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.METER_EXPLORER,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER_EXPLORER',
|
||||
isPrivate: true,
|
||||
},
|
||||
{
|
||||
path: ROUTES.METER_EXPLORER_VIEWS,
|
||||
exact: true,
|
||||
component: MeterExplorerPage,
|
||||
component: MeterExplorer,
|
||||
key: 'METER_EXPLORER_VIEWS',
|
||||
isPrivate: true,
|
||||
},
|
||||
|
||||
@@ -1216,12 +1216,19 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
}
|
||||
|
||||
processingPostCache := time.Now()
|
||||
selectedSpansForRequest := tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans, startTime, endTime)
|
||||
zap.L().Info("getFlamegraphSpansForTrace: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID))
|
||||
selectedSpansForRequest := selectedSpans
|
||||
limit := min(req.Limit, tracedetail.MaxLimitWithoutSampling)
|
||||
totalSpanCount := tracedetail.GetTotalSpanCount(selectedSpans)
|
||||
if totalSpanCount > uint64(limit) {
|
||||
selectedSpansForRequest = tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans, startTime, endTime)
|
||||
}
|
||||
zap.L().Info("getFlamegraphSpansForTrace: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID),
|
||||
zap.Uint64("totalSpanCount", totalSpanCount))
|
||||
|
||||
trace.Spans = selectedSpansForRequest
|
||||
trace.StartTimestampMillis = startTime / 1000000
|
||||
trace.EndTimestampMillis = endTime / 1000000
|
||||
trace.HasMore = totalSpanCount > uint64(limit)
|
||||
return trace, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ var (
|
||||
SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH float64 = 50
|
||||
SPAN_LIMIT_PER_LEVEL int = 100
|
||||
TIMESTAMP_SAMPLING_BUCKET_COUNT int = 50
|
||||
|
||||
MaxLimitWithoutSampling uint = 120_000
|
||||
)
|
||||
|
||||
func ContainsFlamegraphSpan(slice []*model.FlamegraphSpan, item *model.FlamegraphSpan) bool {
|
||||
@@ -184,3 +186,12 @@ func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpan
|
||||
|
||||
return selectedSpansForRequest
|
||||
}
|
||||
|
||||
func GetTotalSpanCount(spans [][]*model.FlamegraphSpan) uint64 {
|
||||
levelCount := len(spans)
|
||||
spanCount := uint64(0)
|
||||
for i := range levelCount {
|
||||
spanCount += uint64(len(spans[i]))
|
||||
}
|
||||
return spanCount
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package formatter
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/converter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/converter"
|
||||
"github.com/dustin/go-humanize"
|
||||
)
|
||||
|
||||
@@ -3,7 +3,7 @@ package formatter
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/converter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/converter"
|
||||
"github.com/dustin/go-humanize"
|
||||
)
|
||||
|
||||
@@ -78,8 +78,9 @@ func toFixed(value float64, decimals DecimalCount) string {
|
||||
}
|
||||
|
||||
decimalPos := strings.Index(formatted, ".")
|
||||
precision := 0
|
||||
if decimalPos != -1 {
|
||||
precision := len(formatted) - decimalPos - 1
|
||||
precision = len(formatted) - decimalPos - 1
|
||||
if precision < *decimals {
|
||||
return formatted + strings.Repeat("0", *decimals-precision)
|
||||
}
|
||||
@@ -88,8 +89,8 @@ func toFixed(value float64, decimals DecimalCount) string {
|
||||
return formatted
|
||||
}
|
||||
|
||||
func toFixedScaled(value float64, scaleFormat string) string {
|
||||
return toFixed(value, nil) + scaleFormat
|
||||
func toFixedScaled(value float64, decimals DecimalCount, scaleFormat string) string {
|
||||
return toFixed(value, decimals) + scaleFormat
|
||||
}
|
||||
|
||||
func getDecimalsForValue(value float64) int {
|
||||
@@ -13,35 +13,35 @@ func (*throughputFormatter) Name() string {
|
||||
return "throughput"
|
||||
}
|
||||
|
||||
func simpleCountUnit(value float64, symbol string) string {
|
||||
func simpleCountUnit(value float64, decimals *int, symbol string) string {
|
||||
units := []string{"", "K", "M", "B", "T"}
|
||||
scaler := scaledUnits(1000, units, 0)
|
||||
|
||||
return scaler(value, nil) + " " + symbol
|
||||
return scaler(value, decimals) + " " + symbol
|
||||
}
|
||||
|
||||
func (f *throughputFormatter) Format(value float64, unit string) string {
|
||||
switch unit {
|
||||
case "cps", "{count}/s":
|
||||
return simpleCountUnit(value, "c/s")
|
||||
return simpleCountUnit(value, nil, "c/s")
|
||||
case "ops", "{ops}/s":
|
||||
return simpleCountUnit(value, "op/s")
|
||||
return simpleCountUnit(value, nil, "op/s")
|
||||
case "reqps", "{req}/s":
|
||||
return simpleCountUnit(value, "req/s")
|
||||
return simpleCountUnit(value, nil, "req/s")
|
||||
case "rps", "{read}/s":
|
||||
return simpleCountUnit(value, "r/s")
|
||||
return simpleCountUnit(value, nil, "r/s")
|
||||
case "wps", "{write}/s":
|
||||
return simpleCountUnit(value, "w/s")
|
||||
return simpleCountUnit(value, nil, "w/s")
|
||||
case "iops", "{iops}/s":
|
||||
return simpleCountUnit(value, "iops")
|
||||
return simpleCountUnit(value, nil, "iops")
|
||||
case "cpm", "{count}/min":
|
||||
return simpleCountUnit(value, "c/m")
|
||||
return simpleCountUnit(value, nil, "c/m")
|
||||
case "opm", "{ops}/min":
|
||||
return simpleCountUnit(value, "op/m")
|
||||
return simpleCountUnit(value, nil, "op/m")
|
||||
case "rpm", "{read}/min":
|
||||
return simpleCountUnit(value, "r/m")
|
||||
return simpleCountUnit(value, nil, "r/m")
|
||||
case "wpm", "{write}/min":
|
||||
return simpleCountUnit(value, "w/m")
|
||||
return simpleCountUnit(value, nil, "w/m")
|
||||
}
|
||||
// When unit is not matched, return the value as it is.
|
||||
return fmt.Sprintf("%v", value)
|
||||
@@ -46,17 +46,17 @@ func toNanoSeconds(value float64) string {
|
||||
if absValue < 1000 {
|
||||
return toFixed(value, nil) + " ns"
|
||||
} else if absValue < 1000000 { // 2000 ns is better represented as 2 µs
|
||||
return toFixedScaled(value/1000, " µs")
|
||||
return toFixedScaled(value/1000, nil, " µs")
|
||||
} else if absValue < 1000000000 { // 2000000 ns is better represented as 2 ms
|
||||
return toFixedScaled(value/1000000, " ms")
|
||||
return toFixedScaled(value/1000000, nil, " ms")
|
||||
} else if absValue < 60000000000 {
|
||||
return toFixedScaled(value/1000000000, " s")
|
||||
return toFixedScaled(value/1000000000, nil, " s")
|
||||
} else if absValue < 3600000000000 {
|
||||
return toFixedScaled(value/60000000000, " min")
|
||||
return toFixedScaled(value/60000000000, nil, " min")
|
||||
} else if absValue < 86400000000000 {
|
||||
return toFixedScaled(value/3600000000000, " hour")
|
||||
return toFixedScaled(value/3600000000000, nil, " hour")
|
||||
} else {
|
||||
return toFixedScaled(value/86400000000000, " day")
|
||||
return toFixedScaled(value/86400000000000, nil, " day")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,9 +66,9 @@ func toMicroSeconds(value float64) string {
|
||||
if absValue < 1000 {
|
||||
return toFixed(value, nil) + " µs"
|
||||
} else if absValue < 1000000 { // 2000 µs is better represented as 2 ms
|
||||
return toFixedScaled(value/1000, " ms")
|
||||
return toFixedScaled(value/1000, nil, " ms")
|
||||
} else {
|
||||
return toFixedScaled(value/1000000, " s")
|
||||
return toFixedScaled(value/1000000, nil, " s")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,16 +80,16 @@ func toMilliSeconds(value float64) string {
|
||||
if absValue < 1000 {
|
||||
return toFixed(value, nil) + " ms"
|
||||
} else if absValue < 60000 {
|
||||
return toFixedScaled(value/1000, " s")
|
||||
return toFixedScaled(value/1000, nil, " s")
|
||||
} else if absValue < 3600000 {
|
||||
return toFixedScaled(value/60000, " min")
|
||||
return toFixedScaled(value/60000, nil, " min")
|
||||
} else if absValue < 86400000 { // 172800000 ms is better represented as 2 day
|
||||
return toFixedScaled(value/3600000, " hour")
|
||||
return toFixedScaled(value/3600000, nil, " hour")
|
||||
} else if absValue < 31536000000 {
|
||||
return toFixedScaled(value/86400000, " day")
|
||||
return toFixedScaled(value/86400000, nil, " day")
|
||||
}
|
||||
|
||||
return toFixedScaled(value/31536000000, " year")
|
||||
return toFixedScaled(value/31536000000, nil, " year")
|
||||
}
|
||||
|
||||
// toSeconds returns a easy to read string representation of the given value in seconds
|
||||
@@ -97,24 +97,24 @@ func toSeconds(value float64) string {
|
||||
absValue := math.Abs(value)
|
||||
|
||||
if absValue < 0.000001 {
|
||||
return toFixedScaled(value*1e9, " ns")
|
||||
return toFixedScaled(value*1e9, nil, " ns")
|
||||
} else if absValue < 0.001 {
|
||||
return toFixedScaled(value*1e6, " µs")
|
||||
return toFixedScaled(value*1e6, nil, " µs")
|
||||
} else if absValue < 1 {
|
||||
return toFixedScaled(value*1e3, " ms")
|
||||
return toFixedScaled(value*1e3, nil, " ms")
|
||||
} else if absValue < 60 {
|
||||
return toFixed(value, nil) + " s"
|
||||
} else if absValue < 3600 {
|
||||
return toFixedScaled(value/60, " min")
|
||||
return toFixedScaled(value/60, nil, " min")
|
||||
} else if absValue < 86400 { // 56000 s is better represented as 15.56 hour
|
||||
return toFixedScaled(value/3600, " hour")
|
||||
return toFixedScaled(value/3600, nil, " hour")
|
||||
} else if absValue < 604800 {
|
||||
return toFixedScaled(value/86400, " day")
|
||||
return toFixedScaled(value/86400, nil, " day")
|
||||
} else if absValue < 31536000 {
|
||||
return toFixedScaled(value/604800, " week")
|
||||
return toFixedScaled(value/604800, nil, " week")
|
||||
}
|
||||
|
||||
return toFixedScaled(value/3.15569e7, " year")
|
||||
return toFixedScaled(value/3.15569e7, nil, " year")
|
||||
}
|
||||
|
||||
// toMinutes returns a easy to read string representation of the given value in minutes
|
||||
@@ -124,13 +124,13 @@ func toMinutes(value float64) string {
|
||||
if absValue < 60 {
|
||||
return toFixed(value, nil) + " min"
|
||||
} else if absValue < 1440 {
|
||||
return toFixedScaled(value/60, " hour")
|
||||
return toFixedScaled(value/60, nil, " hour")
|
||||
} else if absValue < 10080 {
|
||||
return toFixedScaled(value/1440, " day")
|
||||
return toFixedScaled(value/1440, nil, " day")
|
||||
} else if absValue < 604800 {
|
||||
return toFixedScaled(value/10080, " week")
|
||||
return toFixedScaled(value/10080, nil, " week")
|
||||
} else {
|
||||
return toFixedScaled(value/5.25948e5, " year")
|
||||
return toFixedScaled(value/5.25948e5, nil, " year")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,11 +142,11 @@ func toHours(value float64) string {
|
||||
if absValue < 24 {
|
||||
return toFixed(value, nil) + " hour"
|
||||
} else if absValue < 168 {
|
||||
return toFixedScaled(value/24, " day")
|
||||
return toFixedScaled(value/24, nil, " day")
|
||||
} else if absValue < 8760 {
|
||||
return toFixedScaled(value/168, " week")
|
||||
return toFixedScaled(value/168, nil, " week")
|
||||
} else {
|
||||
return toFixedScaled(value/8760, " year")
|
||||
return toFixedScaled(value/8760, nil, " year")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,9 +157,9 @@ func toDays(value float64) string {
|
||||
if absValue < 7 {
|
||||
return toFixed(value, nil) + " day"
|
||||
} else if absValue < 365 {
|
||||
return toFixedScaled(value/7, " week")
|
||||
return toFixedScaled(value/7, nil, " week")
|
||||
} else {
|
||||
return toFixedScaled(value/365, " year")
|
||||
return toFixedScaled(value/365, nil, " year")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,6 +170,6 @@ func toWeeks(value float64) string {
|
||||
if absValue < 52 {
|
||||
return toFixed(value, nil) + " week"
|
||||
} else {
|
||||
return toFixedScaled(value/52, " year")
|
||||
return toFixedScaled(value/52, nil, " year")
|
||||
}
|
||||
}
|
||||
@@ -337,6 +337,7 @@ type GetWaterfallSpansForTraceWithMetadataParams struct {
|
||||
|
||||
type GetFlamegraphSpansForTraceParams struct {
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
Limit uint `json:"limit"`
|
||||
}
|
||||
|
||||
type SpanFilterParams struct {
|
||||
|
||||
@@ -336,6 +336,7 @@ type GetFlamegraphSpansForTraceResponse struct {
|
||||
EndTimestampMillis uint64 `json:"endTimestampMillis"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
Spans [][]*FlamegraphSpan `json:"spans"`
|
||||
HasMore bool `json:"hasMore"`
|
||||
}
|
||||
|
||||
type OtelSpanRef struct {
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/formatter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/formatter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
|
||||
@@ -33,7 +33,7 @@ import (
|
||||
|
||||
logsv3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3"
|
||||
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
|
||||
"github.com/SigNoz/signoz/pkg/formatter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/formatter"
|
||||
|
||||
querierV5 "github.com/SigNoz/signoz/pkg/querier"
|
||||
|
||||
|
||||
@@ -169,7 +169,6 @@ func NewSQLMigrationProviderFactories(
|
||||
sqlmigration.NewAddAnonymousPublicDashboardTransactionFactory(sqlstore),
|
||||
sqlmigration.NewAddRootUserFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewAddUserEmailOrgIDIndexFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewMigrateRulesV4ToV5Factory(sqlstore, telemetryStore),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,209 +0,0 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/transition"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
type migrateRulesV4ToV5 struct {
|
||||
store sqlstore.SQLStore
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func NewMigrateRulesV4ToV5Factory(
|
||||
store sqlstore.SQLStore,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName("migrate_rules_post_deprecation"),
|
||||
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
|
||||
return &migrateRulesV4ToV5{
|
||||
store: store,
|
||||
telemetryStore: telemetryStore,
|
||||
logger: ps.Logger,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) getLogDuplicateKeys(ctx context.Context) ([]string, error) {
|
||||
query := `
|
||||
SELECT name
|
||||
FROM (
|
||||
SELECT DISTINCT name FROM signoz_logs.distributed_logs_attribute_keys
|
||||
INTERSECT
|
||||
SELECT DISTINCT name FROM signoz_logs.distributed_logs_resource_keys
|
||||
)
|
||||
ORDER BY name
|
||||
`
|
||||
|
||||
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
|
||||
if err != nil {
|
||||
migration.logger.WarnContext(ctx, "failed to query log duplicate keys", "error", err)
|
||||
return nil, nil
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var keys []string
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err != nil {
|
||||
migration.logger.WarnContext(ctx, "failed to scan log duplicate key", "error", err)
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) getTraceDuplicateKeys(ctx context.Context) ([]string, error) {
|
||||
query := `
|
||||
SELECT tagKey
|
||||
FROM signoz_traces.distributed_span_attributes_keys
|
||||
WHERE tagType IN ('tag', 'resource')
|
||||
GROUP BY tagKey
|
||||
HAVING COUNT(DISTINCT tagType) > 1
|
||||
ORDER BY tagKey
|
||||
`
|
||||
|
||||
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
|
||||
if err != nil {
|
||||
migration.logger.WarnContext(ctx, "failed to query trace duplicate keys", "error", err)
|
||||
return nil, nil
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var keys []string
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err != nil {
|
||||
migration.logger.WarnContext(ctx, "failed to scan trace duplicate key", "error", err)
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) Up(ctx context.Context, db *bun.DB) error {
|
||||
logsKeys, err := migration.getLogDuplicateKeys(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tracesKeys, err := migration.getTraceDuplicateKeys(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
var rules []struct {
|
||||
ID string `bun:"id"`
|
||||
Data map[string]any `bun:"data"`
|
||||
}
|
||||
|
||||
err = tx.NewSelect().
|
||||
Table("rule").
|
||||
Column("id", "data").
|
||||
Scan(ctx, &rules)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
alertsMigrator := transition.NewAlertMigrateV5(migration.logger, logsKeys, tracesKeys)
|
||||
|
||||
count := 0
|
||||
|
||||
for _, rule := range rules {
|
||||
version, _ := rule.Data["version"].(string)
|
||||
|
||||
if version == "v5" {
|
||||
continue
|
||||
}
|
||||
|
||||
if version == "" {
|
||||
migration.logger.WarnContext(ctx, "unexpected empty version for rule", "rule_id", rule.ID)
|
||||
}
|
||||
|
||||
migration.logger.InfoContext(ctx, "migrating rule v4 to v5", "rule_id", rule.ID, "current_version", version)
|
||||
|
||||
// Check if the queries envelope already exists and is non-empty
|
||||
hasQueriesEnvelope := false
|
||||
if condition, ok := rule.Data["condition"].(map[string]any); ok {
|
||||
if compositeQuery, ok := condition["compositeQuery"].(map[string]any); ok {
|
||||
if queries, ok := compositeQuery["queries"].([]any); ok && len(queries) > 0 {
|
||||
hasQueriesEnvelope = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if hasQueriesEnvelope {
|
||||
// already has queries envelope, just bump version
|
||||
// this is because user made a mistake of choosing version
|
||||
migration.logger.InfoContext(ctx, "rule already has queries envelope, bumping version", "rule_id", rule.ID)
|
||||
rule.Data["version"] = "v5"
|
||||
} else {
|
||||
// old format, run full migration
|
||||
migration.logger.InfoContext(ctx, "rule has old format, running full migration", "rule_id", rule.ID)
|
||||
updated := alertsMigrator.Migrate(ctx, rule.Data)
|
||||
if !updated {
|
||||
migration.logger.WarnContext(ctx, "expected updated to be true but got false", "rule_id", rule.ID)
|
||||
continue
|
||||
}
|
||||
rule.Data["version"] = "v5"
|
||||
}
|
||||
|
||||
dataJSON, err := json.Marshal(rule.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.NewUpdate().
|
||||
Table("rule").
|
||||
Set("data = ?", string(dataJSON)).
|
||||
Where("id = ?", rule.ID).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
}
|
||||
if count != 0 {
|
||||
migration.logger.InfoContext(ctx, "migrate v4 alerts", "count", count)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (migration *migrateRulesV4ToV5) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -355,10 +355,6 @@ func (r *PostableRule) validate() error {
|
||||
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required"))
|
||||
}
|
||||
|
||||
if r.Version != "v5" {
|
||||
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
|
||||
}
|
||||
|
||||
if isAllQueriesDisabled(r.RuleCondition.CompositeQuery) {
|
||||
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition"))
|
||||
}
|
||||
|
||||
@@ -108,7 +108,6 @@ func TestParseIntoRule(t *testing.T) {
|
||||
"ruleType": "threshold_rule",
|
||||
"evalWindow": "5m",
|
||||
"frequency": "1m",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -151,7 +150,6 @@ func TestParseIntoRule(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "DefaultsRule",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -189,7 +187,6 @@ func TestParseIntoRule(t *testing.T) {
|
||||
initRule: PostableRule{},
|
||||
content: []byte(`{
|
||||
"alert": "PromQLRule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "promql",
|
||||
@@ -259,7 +256,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "SeverityLabelTest",
|
||||
"schemaVersion": "v1",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -348,7 +344,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "NoLabelsTest",
|
||||
"schemaVersion": "v1",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -389,7 +384,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "OverwriteTest",
|
||||
"schemaVersion": "v1",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -480,7 +474,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
content: []byte(`{
|
||||
"alert": "V2Test",
|
||||
"schemaVersion": "v2",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -524,7 +517,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
initRule: PostableRule{},
|
||||
content: []byte(`{
|
||||
"alert": "DefaultSchemaTest",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -577,7 +569,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
|
||||
func TestParseIntoRuleThresholdGeneration(t *testing.T) {
|
||||
content := []byte(`{
|
||||
"alert": "TestThresholds",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -648,7 +639,6 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
|
||||
"schemaVersion": "v2",
|
||||
"alert": "MultiThresholdAlert",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -742,7 +732,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyBelowTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -777,7 +766,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyBelowTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -811,7 +799,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyAboveTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -846,7 +833,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyAboveTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -880,7 +866,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyBelowAllTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -916,7 +901,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyBelowAllTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -951,7 +935,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "AnomalyOutOfBoundsTest",
|
||||
"ruleType": "anomaly_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -986,7 +969,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "ThresholdTest",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
@@ -1021,7 +1003,6 @@ func TestAnomalyNegationEval(t *testing.T) {
|
||||
ruleJSON: []byte(`{
|
||||
"alert": "ThresholdTest",
|
||||
"ruleType": "threshold_rule",
|
||||
"version": "v5",
|
||||
"condition": {
|
||||
"compositeQuery": {
|
||||
"queryType": "builder",
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/converter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/converter"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
Reference in New Issue
Block a user