Compare commits

..

1 Commits

Author SHA1 Message Date
Nikhil Soni
6fed7df014 feat: return all spans for flamegraph under a limit 2026-02-25 00:14:37 +05:30
12 changed files with 40 additions and 277 deletions

View File

@@ -82,12 +82,6 @@ exporters:
timeout: 45s
sending_queue:
enabled: false
metadataexporter:
cache:
provider: in_memory
dsn: tcp://clickhouse:9000/signoz_metadata
enabled: true
timeout: 45s
service:
telemetry:
logs:
@@ -99,19 +93,19 @@ service:
traces:
receivers: [otlp]
processors: [signozspanmetrics/delta, batch]
exporters: [clickhousetraces, metadataexporter, signozmeter]
exporters: [clickhousetraces, signozmeter]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
exporters: [signozclickhousemetrics, signozmeter]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
exporters: [signozclickhousemetrics, signozmeter]
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouselogsexporter, metadataexporter, signozmeter]
exporters: [clickhouselogsexporter, signozmeter]
metrics/meter:
receivers: [signozmeter]
processors: [batch/meter]

View File

@@ -82,12 +82,6 @@ exporters:
timeout: 45s
sending_queue:
enabled: false
metadataexporter:
cache:
provider: in_memory
dsn: tcp://clickhouse:9000/signoz_metadata
enabled: true
timeout: 45s
service:
telemetry:
logs:
@@ -99,19 +93,19 @@ service:
traces:
receivers: [otlp]
processors: [signozspanmetrics/delta, batch]
exporters: [clickhousetraces, metadataexporter, signozmeter]
exporters: [clickhousetraces, signozmeter]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
exporters: [signozclickhousemetrics, signozmeter]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
exporters: [signozclickhousemetrics, signozmeter]
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouselogsexporter, metadataexporter, signozmeter]
exporters: [clickhouselogsexporter, signozmeter]
metrics/meter:
receivers: [signozmeter]
processors: [batch/meter]

View File

@@ -308,15 +308,3 @@ export const PublicDashboardPage = Loadable(
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
),
);
export const AlertTypeSelectionPage = Loadable(
() =>
import(
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
),
);
export const MeterExplorerPage = Loadable(
() =>
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
);

View File

@@ -1,10 +1,12 @@
import { RouteProps } from 'react-router-dom';
import ROUTES from 'constants/routes';
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
import MessagingQueues from 'pages/MessagingQueues';
import MeterExplorer from 'pages/MeterExplorer';
import {
AlertHistory,
AlertOverview,
AlertTypeSelectionPage,
AllAlertChannels,
AllErrors,
ApiMonitoring,
@@ -27,8 +29,6 @@ import {
LogsExplorer,
LogsIndexToFields,
LogsSaveViews,
MessagingQueuesMainPage,
MeterExplorerPage,
MetricsExplorer,
OldLogsExplorer,
Onboarding,
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
{
path: ROUTES.MESSAGING_QUEUES_KAFKA,
exact: true,
component: MessagingQueuesMainPage,
component: MessagingQueues,
key: 'MESSAGING_QUEUES_KAFKA',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
exact: true,
component: MessagingQueuesMainPage,
component: MessagingQueues,
key: 'MESSAGING_QUEUES_CELERY_TASK',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
exact: true,
component: MessagingQueuesMainPage,
component: MessagingQueues,
key: 'MESSAGING_QUEUES_OVERVIEW',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
exact: true,
component: MessagingQueuesMainPage,
component: MessagingQueues,
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
isPrivate: true,
},
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
{
path: ROUTES.METER,
exact: true,
component: MeterExplorerPage,
component: MeterExplorer,
key: 'METER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER,
exact: true,
component: MeterExplorerPage,
component: MeterExplorer,
key: 'METER_EXPLORER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER_VIEWS,
exact: true,
component: MeterExplorerPage,
component: MeterExplorer,
key: 'METER_EXPLORER_VIEWS',
isPrivate: true,
},

View File

@@ -1216,12 +1216,19 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
}
processingPostCache := time.Now()
selectedSpansForRequest := tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans, startTime, endTime)
zap.L().Info("getFlamegraphSpansForTrace: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID))
selectedSpansForRequest := selectedSpans
limit := min(req.Limit, tracedetail.MaxLimitWithoutSampling)
totalSpanCount := tracedetail.GetTotalSpanCount(selectedSpans)
if totalSpanCount > uint64(limit) {
selectedSpansForRequest = tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans, startTime, endTime)
}
zap.L().Info("getFlamegraphSpansForTrace: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID),
zap.Uint64("totalSpanCount", totalSpanCount))
trace.Spans = selectedSpansForRequest
trace.StartTimestampMillis = startTime / 1000000
trace.EndTimestampMillis = endTime / 1000000
trace.HasMore = totalSpanCount > uint64(limit)
return trace, nil
}

View File

@@ -10,6 +10,8 @@ var (
SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH float64 = 50
SPAN_LIMIT_PER_LEVEL int = 100
TIMESTAMP_SAMPLING_BUCKET_COUNT int = 50
MaxLimitWithoutSampling uint = 120_000
)
func ContainsFlamegraphSpan(slice []*model.FlamegraphSpan, item *model.FlamegraphSpan) bool {
@@ -184,3 +186,12 @@ func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpan
return selectedSpansForRequest
}
func GetTotalSpanCount(spans [][]*model.FlamegraphSpan) uint64 {
levelCount := len(spans)
spanCount := uint64(0)
for i := range levelCount {
spanCount += uint64(len(spans[i]))
}
return spanCount
}

View File

@@ -337,6 +337,7 @@ type GetWaterfallSpansForTraceWithMetadataParams struct {
type GetFlamegraphSpansForTraceParams struct {
SelectedSpanID string `json:"selectedSpanId"`
Limit uint `json:"limit"`
}
type SpanFilterParams struct {

View File

@@ -336,6 +336,7 @@ type GetFlamegraphSpansForTraceResponse struct {
EndTimestampMillis uint64 `json:"endTimestampMillis"`
DurationNano uint64 `json:"durationNano"`
Spans [][]*FlamegraphSpan `json:"spans"`
HasMore bool `json:"hasMore"`
}
type OtelSpanRef struct {

View File

@@ -169,7 +169,6 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewAddAnonymousPublicDashboardTransactionFactory(sqlstore),
sqlmigration.NewAddRootUserFactory(sqlstore, sqlschema),
sqlmigration.NewAddUserEmailOrgIDIndexFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateRulesV4ToV5Factory(sqlstore, telemetryStore),
)
}

View File

@@ -1,209 +0,0 @@
package sqlmigration
import (
"context"
"database/sql"
"encoding/json"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type migrateRulesV4ToV5 struct {
store sqlstore.SQLStore
telemetryStore telemetrystore.TelemetryStore
logger *slog.Logger
}
func NewMigrateRulesV4ToV5Factory(
store sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(
factory.MustNewName("migrate_rules_post_deprecation"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &migrateRulesV4ToV5{
store: store,
telemetryStore: telemetryStore,
logger: ps.Logger,
}, nil
})
}
func (migration *migrateRulesV4ToV5) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *migrateRulesV4ToV5) getLogDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT name
FROM (
SELECT DISTINCT name FROM signoz_logs.distributed_logs_attribute_keys
INTERSECT
SELECT DISTINCT name FROM signoz_logs.distributed_logs_resource_keys
)
ORDER BY name
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
migration.logger.WarnContext(ctx, "failed to query log duplicate keys", "error", err)
return nil, nil
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
migration.logger.WarnContext(ctx, "failed to scan log duplicate key", "error", err)
continue
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *migrateRulesV4ToV5) getTraceDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT tagKey
FROM signoz_traces.distributed_span_attributes_keys
WHERE tagType IN ('tag', 'resource')
GROUP BY tagKey
HAVING COUNT(DISTINCT tagType) > 1
ORDER BY tagKey
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
migration.logger.WarnContext(ctx, "failed to query trace duplicate keys", "error", err)
return nil, nil
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
migration.logger.WarnContext(ctx, "failed to scan trace duplicate key", "error", err)
continue
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *migrateRulesV4ToV5) Up(ctx context.Context, db *bun.DB) error {
logsKeys, err := migration.getLogDuplicateKeys(ctx)
if err != nil {
return err
}
tracesKeys, err := migration.getTraceDuplicateKeys(ctx)
if err != nil {
return err
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
var rules []struct {
ID string `bun:"id"`
Data map[string]any `bun:"data"`
}
err = tx.NewSelect().
Table("rule").
Column("id", "data").
Scan(ctx, &rules)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
alertsMigrator := transition.NewAlertMigrateV5(migration.logger, logsKeys, tracesKeys)
count := 0
for _, rule := range rules {
version, _ := rule.Data["version"].(string)
if version == "v5" {
continue
}
if version == "" {
migration.logger.WarnContext(ctx, "unexpected empty version for rule", "rule_id", rule.ID)
}
migration.logger.InfoContext(ctx, "migrating rule v4 to v5", "rule_id", rule.ID, "current_version", version)
// Check if the queries envelope already exists and is non-empty
hasQueriesEnvelope := false
if condition, ok := rule.Data["condition"].(map[string]any); ok {
if compositeQuery, ok := condition["compositeQuery"].(map[string]any); ok {
if queries, ok := compositeQuery["queries"].([]any); ok && len(queries) > 0 {
hasQueriesEnvelope = true
}
}
}
if hasQueriesEnvelope {
// already has queries envelope, just bump version
// this is because user made a mistake of choosing version
migration.logger.InfoContext(ctx, "rule already has queries envelope, bumping version", "rule_id", rule.ID)
rule.Data["version"] = "v5"
} else {
// old format, run full migration
migration.logger.InfoContext(ctx, "rule has old format, running full migration", "rule_id", rule.ID)
updated := alertsMigrator.Migrate(ctx, rule.Data)
if !updated {
migration.logger.WarnContext(ctx, "expected updated to be true but got false", "rule_id", rule.ID)
continue
}
rule.Data["version"] = "v5"
}
dataJSON, err := json.Marshal(rule.Data)
if err != nil {
return err
}
_, err = tx.NewUpdate().
Table("rule").
Set("data = ?", string(dataJSON)).
Where("id = ?", rule.ID).
Exec(ctx)
if err != nil {
return err
}
count++
}
if count != 0 {
migration.logger.InfoContext(ctx, "migrate v4 alerts", "count", count)
}
return tx.Commit()
}
func (migration *migrateRulesV4ToV5) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -355,10 +355,6 @@ func (r *PostableRule) validate() error {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required"))
}
if r.Version != "v5" {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
}
if isAllQueriesDisabled(r.RuleCondition.CompositeQuery) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition"))
}

View File

@@ -108,7 +108,6 @@ func TestParseIntoRule(t *testing.T) {
"ruleType": "threshold_rule",
"evalWindow": "5m",
"frequency": "1m",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -151,7 +150,6 @@ func TestParseIntoRule(t *testing.T) {
content: []byte(`{
"alert": "DefaultsRule",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -189,7 +187,6 @@ func TestParseIntoRule(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "PromQLRule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "promql",
@@ -259,7 +256,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "SeverityLabelTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -348,7 +344,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "NoLabelsTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -389,7 +384,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "OverwriteTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -480,7 +474,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "V2Test",
"schemaVersion": "v2",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -524,7 +517,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "DefaultSchemaTest",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -577,7 +569,6 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
func TestParseIntoRuleThresholdGeneration(t *testing.T) {
content := []byte(`{
"alert": "TestThresholds",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -648,7 +639,6 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
"schemaVersion": "v2",
"alert": "MultiThresholdAlert",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -742,7 +732,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -777,7 +766,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -811,7 +799,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyAboveTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -846,7 +833,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyAboveTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -880,7 +866,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowAllTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -916,7 +901,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowAllTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -951,7 +935,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyOutOfBoundsTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -986,7 +969,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "ThresholdTest",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -1021,7 +1003,6 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "ThresholdTest",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",