Compare commits

..

7 Commits

Author SHA1 Message Date
nityanandagohain
04824cf2f2 fix: lint 2026-05-11 22:06:29 +05:30
Nityananda Gohain
384c649ef8 Merge branch 'main' into issue_4522 2026-05-11 22:03:43 +05:30
nityanandagohain
68693f8ffd fix: more updated 2026-05-11 22:03:05 +05:30
nityanandagohain
ca1f92f474 fix: get keys after modifying the selectkeys 2026-05-11 21:19:33 +05:30
nityanandagohain
1ed3d8fc8c fix: minor changes 2026-05-11 20:25:05 +05:30
nityanandagohain
196aa301c4 Merge remote-tracking branch 'origin/main' into issue_4522 2026-05-11 15:06:05 +05:30
nityanandagohain
51fcc22d8a feat: [traces] time aware dynamic field mapper 2026-05-08 18:11:15 +05:30
71 changed files with 1592 additions and 2616 deletions

View File

@@ -18,19 +18,16 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/gateway"
"github.com/SigNoz/signoz/pkg/gateway/noopgateway"
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/retention"
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/prometheus"
@@ -112,9 +109,6 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
return signoz.NewAuditorProviderFactories()
},
func(_ context.Context, _ factory.ProviderSettings, _ flagger.Flagger, _ licensing.Licensing, _ telemetrystore.TelemetryStore, _ retention.Getter, _ organization.Getter, _ zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string) {
return signoz.NewMeterReporterProviderFactories(), "noop"
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
return querier.NewHandler(ps, q, a)
},

View File

@@ -1,55 +0,0 @@
package main
import (
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/types/zeustypes"
)
var meterConfigs = []metercollector.Config{
{
Provider: metercollector.ProviderStatic,
Static: metercollector.StaticConfig{
Name: zeustypes.MeterPlatformActive,
Unit: zeustypes.MeterUnitCount,
Aggregation: zeustypes.MeterAggregationMax,
Value: 1,
},
},
{
Provider: metercollector.ProviderTelemetry,
Telemetry: metercollector.TelemetryConfig{
Name: zeustypes.MeterLogSize,
Unit: zeustypes.MeterUnitBytes,
Aggregation: zeustypes.MeterAggregationSum,
DBName: telemetrylogs.DBName,
TableName: telemetrylogs.LogsV2LocalTableName,
DefaultRetentionDays: retentiontypes.DefaultLogsRetentionDays,
},
},
{
Provider: metercollector.ProviderTelemetry,
Telemetry: metercollector.TelemetryConfig{
Name: zeustypes.MeterSpanSize,
Unit: zeustypes.MeterUnitBytes,
Aggregation: zeustypes.MeterAggregationSum,
DBName: telemetrytraces.DBName,
TableName: telemetrytraces.SpanIndexV3LocalTableName,
DefaultRetentionDays: retentiontypes.DefaultTracesRetentionDays,
},
},
{
Provider: metercollector.ProviderTelemetry,
Telemetry: metercollector.TelemetryConfig{
Name: zeustypes.MeterDatapointCount,
Unit: zeustypes.MeterUnitCount,
Aggregation: zeustypes.MeterAggregationSum,
DBName: telemetrymetrics.DBName,
TableName: telemetrymetrics.SamplesV4LocalTableName,
DefaultRetentionDays: retentiontypes.DefaultMetricsRetentionDays,
},
},
}

View File

@@ -18,9 +18,6 @@ import (
"github.com/SigNoz/signoz/ee/gateway/httpgateway"
enterpriselicensing "github.com/SigNoz/signoz/ee/licensing"
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
"github.com/SigNoz/signoz/ee/metercollector/staticmetercollector"
"github.com/SigNoz/signoz/ee/metercollector/telemetrymetercollector"
"github.com/SigNoz/signoz/ee/meterreporter/httpmeterreporter"
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration/implcloudprovider"
"github.com/SigNoz/signoz/ee/modules/dashboard/impldashboard"
@@ -39,17 +36,14 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
pkgflagger "github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/gateway"
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
pkgcloudintegration "github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
pkgimpldashboard "github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/retention"
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/prometheus"
@@ -167,20 +161,6 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
}
return factories
},
func(ctx context.Context, providerSettings factory.ProviderSettings, flagger pkgflagger.Flagger, licensing licensing.Licensing, telemetryStore telemetrystore.TelemetryStore, retentionGetter retention.Getter, orgGetter organization.Getter, zeus zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string) {
factories := signoz.NewMeterReporterProviderFactories()
collectorFactories := factory.MustNewNamedMap(
staticmetercollector.NewFactory(),
telemetrymetercollector.NewFactory(telemetryStore, retentionGetter),
)
if err := factories.Add(httpmeterreporter.NewFactory(collectorFactories, meterConfigs, flagger, licensing, orgGetter, zeus)); err != nil {
panic(err)
}
return factories, "http"
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
communityHandler := querier.NewHandler(ps, q, a)
return eequerier.NewHandler(ps, q, communityHandler)

View File

@@ -429,10 +429,3 @@ authz:
openfga:
# maximum tuples allowed per openfga write operation.
max_tuples_per_write: 300
##################### Meter Reporter #####################
meterreporter:
# The interval between collection ticks. Minimum 5m.
interval: 6h
# Whether to backfill sealed days from the license creation day.
backfill: true

View File

@@ -1,61 +0,0 @@
// Package staticmetercollector emits a fixed-value meter reading per org per window.
package staticmetercollector
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/zeustypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var _ metercollector.MeterCollector = (*Provider)(nil)
type Provider struct {
settings factory.ScopedProviderSettings
config metercollector.StaticConfig
}
func NewFactory() factory.ProviderFactory[metercollector.MeterCollector, metercollector.Config] {
return factory.NewProviderFactory(factory.MustNewName(metercollector.ProviderStatic), func(ctx context.Context, providerSettings factory.ProviderSettings, config metercollector.Config) (metercollector.MeterCollector, error) {
return newProvider(providerSettings, config.Static), nil
},
)
}
func newProvider(providerSettings factory.ProviderSettings, config metercollector.StaticConfig) *Provider {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/metercollector/staticmetercollector")
return &Provider{
settings: settings,
config: config,
}
}
func (provider *Provider) Name() zeustypes.MeterName { return provider.config.Name }
func (provider *Provider) Unit() zeustypes.MeterUnit { return provider.config.Unit }
func (provider *Provider) Aggregation() zeustypes.MeterAggregation {
return provider.config.Aggregation
}
func (provider *Provider) Origin(_ context.Context, _ valuer.UUID, license *licensetypes.License, _ time.Time) (time.Time, error) {
if license == nil || license.CreatedAt.IsZero() {
return time.Time{}, nil
}
createdAt := license.CreatedAt.UTC()
return time.Date(createdAt.Year(), createdAt.Month(), createdAt.Day(), 0, 0, 0, 0, time.UTC), nil
}
func (provider *Provider) Collect(_ context.Context, orgID valuer.UUID, license *licensetypes.License, window zeustypes.MeterWindow) ([]zeustypes.Meter, error) {
if license == nil || license.Key == "" {
return nil, nil
}
return []zeustypes.Meter{
zeustypes.NewMeter(provider.config.Name, provider.config.Value, provider.config.Unit, provider.config.Aggregation, window, zeustypes.NewDimensions(zeustypes.OrganizationID.String(orgID.StringValue()))),
}, nil
}

View File

@@ -1,247 +0,0 @@
// Package telemetrymetercollector collects telemetry meters (logs, traces, metrics)
// by retention. One Provider materializes per TelemetryConfig.
package telemetrymetercollector
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"
"github.com/huandu/go-sqlbuilder"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/modules/retention"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/types/zeustypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
labelKeyPattern = regexp.MustCompile(`^[A-Za-z0-9_.\-]+$`)
labelValuePattern = regexp.MustCompile(`^[A-Za-z0-9_.\-:]+$`)
)
var _ metercollector.MeterCollector = (*Provider)(nil)
type Provider struct {
settings factory.ScopedProviderSettings
config metercollector.TelemetryConfig
telemetryStore telemetrystore.TelemetryStore
retentionGetter retention.Getter
}
func NewFactory(telemetryStore telemetrystore.TelemetryStore, retentionGetter retention.Getter) factory.ProviderFactory[metercollector.MeterCollector, metercollector.Config] {
return factory.NewProviderFactory(factory.MustNewName(metercollector.ProviderTelemetry), func(ctx context.Context, providerSettings factory.ProviderSettings, config metercollector.Config) (metercollector.MeterCollector, error) {
return newProvider(providerSettings, config.Telemetry, telemetryStore, retentionGetter), nil
},
)
}
func newProvider(
providerSettings factory.ProviderSettings,
config metercollector.TelemetryConfig,
telemetryStore telemetrystore.TelemetryStore,
retentionGetter retention.Getter,
) *Provider {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/metercollector/telemetrymetercollector")
return &Provider{
settings: settings,
config: config,
telemetryStore: telemetryStore,
retentionGetter: retentionGetter,
}
}
func (provider *Provider) Name() zeustypes.MeterName { return provider.config.Name }
func (provider *Provider) Unit() zeustypes.MeterUnit { return provider.config.Unit }
func (provider *Provider) Aggregation() zeustypes.MeterAggregation {
return provider.config.Aggregation
}
func (provider *Provider) Origin(ctx context.Context, _ valuer.UUID, _ *licensetypes.License, todayStart time.Time) (time.Time, error) {
query, args := buildOriginQuery(provider.config.Name.String())
var minMs int64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&minMs); err != nil {
return time.Time{}, err
}
if minMs == 0 {
return todayStart, nil
}
minDay := time.UnixMilli(minMs).UTC()
return time.Date(minDay.Year(), minDay.Month(), minDay.Day(), 0, 0, 0, 0, time.UTC), nil
}
func (provider *Provider) Collect(
ctx context.Context,
orgID valuer.UUID,
_ *licensetypes.License,
window zeustypes.MeterWindow,
) ([]zeustypes.Meter, error) {
meterName := provider.config.Name.String()
segments, err := provider.retentionGetter.GetRetentionPolicySegments(
ctx,
orgID,
provider.config.DBName,
provider.config.TableName,
provider.config.DefaultRetentionDays,
window.StartUnixMilli,
window.EndUnixMilli,
)
if err != nil {
return nil, err
}
valuesByRetentionDays := make(map[int]int64)
for _, segment := range segments {
query, args, err := buildQuery(meterName, segment)
if err != nil {
return nil, err
}
rows, err := provider.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, err
}
if err := func() error {
defer rows.Close()
for rows.Next() {
var retentionDays int32
var value int64
if err := rows.Scan(&retentionDays, &value); err != nil {
return err
}
valuesByRetentionDays[int(retentionDays)] += value
}
if err := rows.Err(); err != nil {
return err
}
return nil
}(); err != nil {
return nil, err
}
}
meters := make([]zeustypes.Meter, 0, len(valuesByRetentionDays))
for retentionDays, value := range valuesByRetentionDays {
meters = append(meters, zeustypes.NewMeter(provider.config.Name, value, provider.config.Unit, provider.config.Aggregation, window, buildDimensions(orgID, retentionDays)))
}
// Empty windows still emit a sentinel so checkpoints can advance.
if len(meters) == 0 && len(segments) > 0 {
meters = append(meters, zeustypes.NewMeter(provider.config.Name, 0, provider.config.Unit, provider.config.Aggregation, window, buildDimensions(orgID, segments[len(segments)-1].DefaultDays)))
}
return meters, nil
}
func buildOriginQuery(meterName string) (string, []any) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("toInt64(ifNull(min(unix_milli), 0))")
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(sb.Equal("metric_name", meterName))
return sb.BuildWithFlavor(sqlbuilder.ClickHouse)
}
func buildQuery(meterName string, segment *retentiontypes.RetentionPolicySegment) (string, []any, error) {
retentionExpr, err := buildRetentionMultiIfSQL(segment.Rules, segment.DefaultDays)
if err != nil {
return "", nil, err
}
selects := []string{
retentionExpr + " AS retention_days",
"toInt64(ifNull(sum(value), 0)) AS value",
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select(selects...)
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", meterName),
sb.GTE("unix_milli", segment.StartMs),
sb.LT("unix_milli", segment.EndMs),
)
sb.GroupBy("retention_days")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return query, args, nil
}
func buildRetentionMultiIfSQL(rules []retentiontypes.CustomRetentionRule, defaultDays int) (string, error) {
if defaultDays <= 0 {
return "", errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeMeterCollectorInvalidCustomRetentionRule, "non-positive default retention %d", defaultDays)
}
if len(rules) == 0 {
return "toInt32(" + strconv.Itoa(defaultDays) + ")", nil
}
arms := make([]string, 0, 2*len(rules)+1)
for ruleIndex, rule := range rules {
if rule.TTLDays <= 0 {
return "", errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeMeterCollectorInvalidCustomRetentionRule, "rule %d has non-positive ttl_days %d", ruleIndex, rule.TTLDays)
}
conditionExpr, err := buildRuleConditionSQL(ruleIndex, rule)
if err != nil {
return "", err
}
arms = append(arms, conditionExpr)
arms = append(arms, strconv.Itoa(rule.TTLDays))
}
arms = append(arms, strconv.Itoa(defaultDays))
return "toInt32(multiIf(" + strings.Join(arms, ", ") + "))", nil
}
func buildRuleConditionSQL(ruleIndex int, rule retentiontypes.CustomRetentionRule) (string, error) {
if len(rule.Filters) == 0 {
return "", errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeMeterCollectorInvalidCustomRetentionRule, "rule %d has no filters", ruleIndex)
}
filterExprs := make([]string, 0, len(rule.Filters))
for filterIndex, filter := range rule.Filters {
if !labelKeyPattern.MatchString(filter.Key) {
return "", errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeMeterCollectorInvalidCustomRetentionRule, "rule %d filter %d has invalid key %q", ruleIndex, filterIndex, filter.Key)
}
if len(filter.Values) == 0 {
return "", errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeMeterCollectorInvalidCustomRetentionRule, "rule %d filter %d has no values", ruleIndex, filterIndex)
}
quoted := make([]string, len(filter.Values))
for valueIndex, value := range filter.Values {
if !labelValuePattern.MatchString(value) {
return "", errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeMeterCollectorInvalidCustomRetentionRule, "rule %d filter %d value %d is invalid %q", ruleIndex, filterIndex, valueIndex, value)
}
quoted[valueIndex] = "'" + value + "'"
}
filterExprs = append(filterExprs, fmt.Sprintf("JSONExtractString(labels, '%s') IN (%s)", filter.Key, strings.Join(quoted, ", ")))
}
return strings.Join(filterExprs, " AND "), nil
}
func buildDimensions(orgID valuer.UUID, retentionDays int) map[string]string {
retentionDurationSeconds := int64(retentionDays) * 24 * 60 * 60 // seconds
return zeustypes.NewDimensions(
zeustypes.OrganizationID.String(orgID.StringValue()),
zeustypes.RetentionDuration.String(strconv.FormatInt(retentionDurationSeconds, 10)),
)
}

View File

@@ -1,318 +0,0 @@
package httpmeterreporter
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/zeustypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/zeus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
var _ factory.ServiceWithHealthy = (*Provider)(nil)
type Provider struct {
settings factory.ScopedProviderSettings
config meterreporter.Config
collectorsByName map[zeustypes.MeterName]metercollector.MeterCollector
flagger flagger.Flagger
licensing licensing.Licensing
orgGetter organization.Getter
zeus zeus.Zeus
healthyC chan struct{}
stopC chan struct{}
metrics *reporterMetrics
}
func NewFactory(collectorFactories factory.NamedMap[factory.ProviderFactory[metercollector.MeterCollector, metercollector.Config]], collectorConfigs []metercollector.Config, flagger flagger.Flagger, licensing licensing.Licensing, orgGetter organization.Getter, zeus zeus.Zeus) factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
return factory.NewProviderFactory(factory.MustNewName("http"), func(ctx context.Context, providerSettings factory.ProviderSettings, config meterreporter.Config) (meterreporter.Reporter, error) {
return newProvider(ctx, providerSettings, config, collectorFactories, collectorConfigs, flagger, licensing, orgGetter, zeus)
},
)
}
func newProvider(
ctx context.Context,
providerSettings factory.ProviderSettings,
config meterreporter.Config,
collectorFactories factory.NamedMap[factory.ProviderFactory[metercollector.MeterCollector, metercollector.Config]],
collectorConfigs []metercollector.Config,
flagger flagger.Flagger,
licensing licensing.Licensing,
orgGetter organization.Getter,
zeus zeus.Zeus,
) (*Provider, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/meterreporter/httpmeterreporter")
collectorsByName := map[zeustypes.MeterName]metercollector.MeterCollector{}
for _, collectorConfig := range collectorConfigs {
collector, err := factory.NewProviderFromNamedMap(ctx, providerSettings, collectorConfig, collectorFactories, collectorConfig.Provider)
if err != nil {
return nil, err
}
if _, exists := collectorsByName[collector.Name()]; exists {
return nil, errors.Newf(errors.TypeAlreadyExists, errors.CodeAlreadyExists, "duplicate meter collector %q", collector.Name())
}
collectorsByName[collector.Name()] = collector
}
metrics, err := newReporterMetrics(settings.Meter())
if err != nil {
return nil, err
}
return &Provider{
settings: settings,
config: config,
collectorsByName: collectorsByName,
flagger: flagger,
licensing: licensing,
orgGetter: orgGetter,
zeus: zeus,
healthyC: make(chan struct{}),
stopC: make(chan struct{}),
metrics: metrics,
}, nil
}
func (provider *Provider) Start(ctx context.Context) error {
close(provider.healthyC)
provider.collect(ctx)
ticker := time.NewTicker(provider.config.Interval)
defer ticker.Stop()
for {
select {
case <-provider.stopC:
return nil
case <-ticker.C:
provider.collect(ctx)
}
}
}
func (provider *Provider) collect(ctx context.Context) {
ctx, span := provider.settings.Tracer().Start(ctx, "meterreporter.Collect", trace.WithAttributes(attribute.String("meterreporter.provider", "http")))
defer span.End()
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
span.RecordError(err)
provider.settings.Logger().ErrorContext(ctx, "failed to get orgs data", errors.Attr(err))
return
}
for _, org := range orgs {
evalCtx := featuretypes.NewFlaggerEvaluationContext(org.ID)
if !provider.flagger.BooleanOrEmpty(ctx, flagger.FeatureUseMeterReporter, evalCtx) {
provider.settings.Logger().DebugContext(ctx, "meter reporter disabled for org, skipping reporting", slog.String("org_id", org.ID.StringValue()))
continue
}
license, err := provider.licensing.GetActive(ctx, org.ID)
if err != nil {
if errors.Ast(err, errors.TypeNotFound) {
provider.settings.Logger().DebugContext(ctx, "no active license found for org, skipping reporting", slog.String("org_id", org.ID.StringValue()))
continue
}
span.RecordError(err)
provider.settings.Logger().ErrorContext(ctx, "failed to fetch active license for org", errors.Attr(err), slog.String("org_id", org.ID.StringValue()))
return
}
if err := provider.collectOrg(ctx, org, license); err != nil {
span.RecordError(err)
provider.settings.Logger().ErrorContext(ctx, "failed to collect meters", errors.Attr(err), slog.String("org_id", org.ID.StringValue()))
}
}
}
func (provider *Provider) Stop(ctx context.Context) error {
close(provider.stopC)
return nil
}
func (provider *Provider) Healthy() <-chan struct{} {
return provider.healthyC
}
func (provider *Provider) collectOrg(ctx context.Context, org *types.Organization, license *licensetypes.License) error {
now := time.Now().UTC()
// Use one timestamp so a tick cannot straddle midnight.
todayStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
if provider.config.Backfill {
checkpointsByMeter, err := provider.checkpoints(ctx, license.Key)
if err != nil {
return err
}
nextByCollector := provider.nextDays(license, todayStart, checkpointsByMeter)
start, end, ok := backfillRange(nextByCollector, todayStart)
if ok {
for day := start; !day.After(end); day = day.AddDate(0, 0, 1) {
eligible := eligibleCollectors(provider.collectorsByName, nextByCollector, day)
if len(eligible) == 0 {
continue
}
window, err := zeustypes.NewMeterWindow(day.UnixMilli(), day.AddDate(0, 0, 1).UnixMilli(), true)
if err != nil {
return err
}
if err := provider.report(ctx, org.ID, license, window, eligible); err != nil {
provider.settings.Logger().WarnContext(ctx, "failed to backfill for day", errors.Attr(err), slog.String("date", day.Format("2006-01-02")))
return err
}
}
}
}
// Today's partial window: every collector is always eligible (next <= today).
if now.UnixMilli() > todayStart.UnixMilli() {
todayWindow, err := zeustypes.NewMeterWindow(todayStart.UnixMilli(), now.UnixMilli(), false)
if err != nil {
return err
}
return provider.report(ctx, org.ID, license, todayWindow, provider.collectorsByName)
}
return nil
}
func (provider *Provider) checkpoints(ctx context.Context, licenseKey string) (map[string]time.Time, error) {
list, err := provider.zeus.ListMeterCheckpoints(ctx, licenseKey)
if err != nil {
provider.metrics.checkpoints.Add(ctx, 1, metric.WithAttributes(errors.TypeAttr(err)))
return nil, err
}
provider.metrics.checkpoints.Add(ctx, 1)
checkpointsByMeter := make(map[string]time.Time, len(list))
for _, checkpoint := range list {
checkpointsByMeter[checkpoint.Name] = checkpoint.StartDate.UTC()
}
return checkpointsByMeter, nil
}
func (provider *Provider) nextDays(license *licensetypes.License, todayStart time.Time, checkpointsByMeter map[string]time.Time) map[zeustypes.MeterName]time.Time {
nextByCollector := make(map[zeustypes.MeterName]time.Time, len(provider.collectorsByName))
licenseCreatedAt := license.CreatedAt.UTC()
licenseCreatedAtDay := time.Date(licenseCreatedAt.Year(), licenseCreatedAt.Month(), licenseCreatedAt.Day(), 0, 0, 0, 0, time.UTC)
for _, collector := range provider.collectorsByName {
checkpoint, hasCheckpoint := checkpointsByMeter[collector.Name().String()]
nextByCollector[collector.Name()] = nextReportableDay(licenseCreatedAtDay, todayStart, checkpoint, hasCheckpoint)
}
return nextByCollector
}
func nextReportableDay(licenseCreatedAtDay time.Time, todayStart time.Time, checkpoint time.Time, hasCheckpoint bool) time.Time {
next := licenseCreatedAtDay
if next.IsZero() {
next = todayStart
}
if hasCheckpoint {
checkpointNext := checkpoint.AddDate(0, 0, 1)
if checkpointNext.After(next) {
next = checkpointNext
}
}
return next
}
func (provider *Provider) report(ctx context.Context, orgID valuer.UUID, license *licensetypes.License, window zeustypes.MeterWindow, collectors map[zeustypes.MeterName]metercollector.MeterCollector) error {
date := time.UnixMilli(window.StartUnixMilli).UTC().Format("2006-01-02")
meters := make([]zeustypes.Meter, 0, len(collectors))
for _, collector := range collectors {
meterAttr := attribute.String("signoz.meter.name", collector.Name().String())
collectedReadings, err := collector.Collect(ctx, orgID, license, window)
if err != nil {
provider.metrics.collections.Add(ctx, 1, metric.WithAttributes(meterAttr, errors.TypeAttr(err)))
continue
}
provider.metrics.collections.Add(ctx, 1, metric.WithAttributes(meterAttr))
meters = append(meters, collectedReadings...)
}
if len(meters) == 0 {
return nil
}
idempotencyKey := fmt.Sprintf("meterreporter:%s", date)
body, err := json.Marshal(meters)
if err != nil {
provider.metrics.reports.Add(ctx, 1, metric.WithAttributes(errors.TypeAttr(err)))
return err
}
if err := provider.zeus.PutMetersV3(ctx, license.Key, idempotencyKey, body); err != nil {
provider.metrics.reports.Add(ctx, 1, metric.WithAttributes(errors.TypeAttr(err)))
return err
}
provider.metrics.reports.Add(ctx, 1)
provider.metrics.meters.Add(ctx, int64(len(meters)))
return nil
}
// backfillRange returns the inclusive sealed-day range ending at yesterday.
func backfillRange(nextByCollector map[zeustypes.MeterName]time.Time, todayStart time.Time) (start, end time.Time, ok bool) {
yesterday := todayStart.AddDate(0, 0, -1)
for _, next := range nextByCollector {
if !next.Before(todayStart) {
continue
}
if start.IsZero() || next.Before(start) {
start = next
}
}
if start.IsZero() || start.After(yesterday) {
return time.Time{}, time.Time{}, false
}
return start, yesterday, true
}
func eligibleCollectors(collectors map[zeustypes.MeterName]metercollector.MeterCollector, nextByCollector map[zeustypes.MeterName]time.Time, day time.Time) map[zeustypes.MeterName]metercollector.MeterCollector {
eligible := make(map[zeustypes.MeterName]metercollector.MeterCollector, len(collectors))
for name, collector := range collectors {
if !nextByCollector[name].After(day) {
eligible[name] = collector
}
}
return eligible
}

View File

@@ -1,48 +0,0 @@
package httpmeterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"go.opentelemetry.io/otel/metric"
)
type reporterMetrics struct {
checkpoints metric.Int64Counter
reports metric.Int64Counter
collections metric.Int64Counter
meters metric.Int64Counter
}
func newReporterMetrics(meter metric.Meter) (*reporterMetrics, error) {
var errs error
checkpoints, err := meter.Int64Counter("signoz.meterreporter.checkpoints", metric.WithDescription("Zeus meter checkpoint fetches."), metric.WithUnit("{checkpoint}"))
if err != nil {
errs = errors.Join(errs, err)
}
reports, err := meter.Int64Counter("signoz.meterreporter.reports", metric.WithDescription("Meter reports shipped to Zeus."), metric.WithUnit("{report}"))
if err != nil {
errs = errors.Join(errs, err)
}
collections, err := meter.Int64Counter("signoz.meterreporter.collections", metric.WithDescription("Per-meter collect calls."), metric.WithUnit("{collection}"))
if err != nil {
errs = errors.Join(errs, err)
}
meters, err := meter.Int64Counter("signoz.meterreporter.meters", metric.WithDescription("Meter readings shipped to Zeus."), metric.WithUnit("{meter}"))
if err != nil {
errs = errors.Join(errs, err)
}
if errs != nil {
return nil, errs
}
return &reporterMetrics{
checkpoints: checkpoints,
reports: reports,
collections: collections,
meters: meters,
}, nil
}

View File

@@ -150,72 +150,6 @@ func (provider *Provider) PutMetersV2(ctx context.Context, key string, data []by
return err
}
func (provider *Provider) PutMetersV3(ctx context.Context, key string, idempotencyKey string, data []byte) error {
headers := http.Header{}
if idempotencyKey != "" {
headers.Set("X-Idempotency-Key", idempotencyKey)
}
_, err := provider.doWithHeaders(
ctx,
provider.config.URL.JoinPath("/v2/meters"),
http.MethodPost,
key,
data,
headers,
)
return err
}
func (provider *Provider) ListMeterCheckpoints(ctx context.Context, key string) ([]zeustypes.MeterCheckpoint, error) {
response, err := provider.do(
ctx,
provider.config.URL.JoinPath("/v2/meters/checkpoints"),
http.MethodGet,
key,
nil,
)
if err != nil {
return nil, err
}
checkpointValues := gjson.GetBytes(response, "data")
if !checkpointValues.Exists() || checkpointValues.Type == gjson.Null {
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoints are required")
}
if !checkpointValues.IsArray() {
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoints must be an array")
}
checkpointResults := checkpointValues.Array()
checkpoints := make([]zeustypes.MeterCheckpoint, 0, len(checkpointResults))
for _, checkpointValue := range checkpointResults {
name := checkpointValue.Get("name").String()
if name == "" {
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoint name is required")
}
startDateString := checkpointValue.Get("start_date").String()
if startDateString == "" {
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoint start_date is required for %q", name)
}
startDate, err := time.Parse("2006-01-02", startDateString)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, zeus.ErrCodeResponseMalformed, "parse meter checkpoint start_date %q for %q", startDateString, name)
}
checkpoints = append(checkpoints, zeustypes.MeterCheckpoint{
Name: name,
StartDate: startDate,
})
}
return checkpoints, nil
}
func (provider *Provider) PutProfile(ctx context.Context, key string, profile *zeustypes.PostableProfile) error {
body, err := json.Marshal(profile)
if err != nil {
@@ -251,21 +185,12 @@ func (provider *Provider) PutHost(ctx context.Context, key string, host *zeustyp
}
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
return provider.doWithHeaders(ctx, url, method, key, requestBody, nil)
}
func (provider *Provider) doWithHeaders(ctx context.Context, url *url.URL, method string, key string, requestBody []byte, extraHeaders http.Header) ([]byte, error) {
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
request.Header.Set("Content-Type", "application/json")
for k, vs := range extraHeaders {
for _, v := range vs {
request.Header.Add(k, v)
}
}
response, err := provider.httpClient.Do(request)
if err != nil {

View File

@@ -1,69 +1,16 @@
import { Callout } from '@signozhq/ui/callout';
import ClickHouseQueryBuilder from 'container/NewWidget/LeftContainer/QuerySection/QueryBuilder/ClickHouse/query';
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
import { AlertTypes } from 'types/api/alerts/alertTypes';
import DOCLINKS from 'utils/docLinks';
import 'container/NewWidget/LeftContainer/QuerySection/QueryBuilder/ClickHouse/ClickHouse.styles.scss';
const ALERT_TYPE_DOC_LINK: Partial<Record<AlertTypes, string>> = {
[AlertTypes.LOGS_BASED_ALERT]: DOCLINKS.QUERY_CLICKHOUSE_LOGS,
[AlertTypes.TRACES_BASED_ALERT]: DOCLINKS.QUERY_CLICKHOUSE_TRACES,
[AlertTypes.EXCEPTIONS_BASED_ALERT]: DOCLINKS.QUERY_CLICKHOUSE_TRACES,
[AlertTypes.METRICS_BASED_ALERT]: DOCLINKS.QUERY_CLICKHOUSE_METRICS,
};
const ALERT_TYPES_WITH_AGENT_SKILL: AlertTypes[] = [
AlertTypes.LOGS_BASED_ALERT,
AlertTypes.TRACES_BASED_ALERT,
AlertTypes.EXCEPTIONS_BASED_ALERT,
];
interface ChQuerySectionProps {
alertType: AlertTypes;
}
function ChQuerySection({ alertType }: ChQuerySectionProps): JSX.Element {
function ChQuerySection(): JSX.Element {
const { currentQuery } = useQueryBuilder();
const docLink = ALERT_TYPE_DOC_LINK[alertType];
const showAgentSkill = ALERT_TYPES_WITH_AGENT_SKILL.includes(alertType);
return (
<>
{docLink && (
<div className="info-banner-wrapper">
<Callout
type="info"
showIcon
title={
<span>
<a href={docLink} target="_blank" rel="noreferrer">
Learn to write faster, optimized queries
</a>
{showAgentSkill && (
<>
{' · Using AI? '}
<a
href={DOCLINKS.AGENT_SKILL_INSTALL}
target="_blank"
rel="noreferrer"
>
Install the SigNoz ClickHouse query agent skill
</a>
</>
)}
</span>
}
/>
</div>
)}
<ClickHouseQueryBuilder
key="A"
queryIndex={0}
queryData={currentQuery.clickhouse_sql[0]}
deletable={false}
/>
</>
<ClickHouseQueryBuilder
key="A"
queryIndex={0}
queryData={currentQuery.clickhouse_sql[0]}
deletable={false}
/>
);
}

View File

@@ -56,9 +56,7 @@ function QuerySection({
const renderPromqlUI = (): JSX.Element => <PromqlSection />;
const renderChQueryUI = (): JSX.Element => (
<ChQuerySection alertType={alertType} />
);
const renderChQueryUI = (): JSX.Element => <ChQuerySection />;
const isDarkMode = useIsDarkMode();

View File

@@ -10,10 +10,6 @@ const DOCLINKS = {
'https://signoz.io/docs/external-api-monitoring/overview/',
QUERY_CLICKHOUSE_TRACES:
'https://signoz.io/docs/userguide/writing-clickhouse-traces-query/#timestamp-bucketing-for-distributed_signoz_index_v3',
QUERY_CLICKHOUSE_LOGS:
'https://signoz.io/docs/userguide/logs_clickhouse_queries/',
QUERY_CLICKHOUSE_METRICS:
'https://signoz.io/docs/userguide/write-a-metrics-clickhouse-query/',
AGENT_SKILL_INSTALL: 'https://signoz.io/docs/ai/agent-skills/#installation',
};

View File

@@ -4,8 +4,6 @@ import (
"errors" //nolint:depguard
"fmt"
"log/slog"
"go.opentelemetry.io/otel/attribute"
)
// base is the fundamental struct that implements the error interface.
@@ -255,10 +253,3 @@ func NewTimeoutf(code Code, format string, args ...any) *base {
func Attr(err error) slog.Attr {
return slog.Any("exception", err)
}
// TypeAttr returns an OTel attribute.KeyValue with the "error.type" semconv key
// set to the error's type string.
func TypeAttr(err error) attribute.KeyValue {
t, _, _, _, _, _ := Unwrapb(err)
return attribute.String("error.type", t.String())
}

View File

@@ -8,7 +8,6 @@ var (
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
FeatureUseMeterReporter = featuretypes.MustNewName("use_meter_reporter")
FeatureUseJSONBody = featuretypes.MustNewName("use_json_body")
)
@@ -54,14 +53,6 @@ func MustNewRegistry() featuretypes.Registry {
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureUseMeterReporter,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether the enterprise meter reporter runs instead of the noop reporter",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureUseJSONBody,
Kind: featuretypes.KindBoolean,

View File

@@ -1,68 +0,0 @@
package metercollector
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/zeustypes"
)
const (
ProviderStatic = "static"
ProviderTelemetry = "telemetry"
)
type Config struct {
Provider string `mapstructure:"provider"`
Telemetry TelemetryConfig `mapstructure:"telemetry"`
Static StaticConfig `mapstructure:"static"`
}
func (c Config) Validate() error {
switch c.Provider {
case ProviderStatic:
return c.Static.Validate()
case ProviderTelemetry:
return c.Telemetry.Validate()
default:
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidConfig, "meter collector: unknown provider %q", c.Provider)
}
}
type TelemetryConfig struct {
Name zeustypes.MeterName
Unit zeustypes.MeterUnit
Aggregation zeustypes.MeterAggregation
DBName string
TableName string
DefaultRetentionDays int
}
type StaticConfig struct {
Name zeustypes.MeterName
Unit zeustypes.MeterUnit
Aggregation zeustypes.MeterAggregation
Value int64
}
func (c StaticConfig) Validate() error {
if c.Name.IsZero() {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidConfig, "static meter collector: name must be set")
}
return nil
}
func (c TelemetryConfig) Validate() error {
if c.Name.IsZero() {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidConfig, "telemetry meter collector: name must be set")
}
if c.DBName == "" || c.TableName == "" {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidConfig, "telemetry meter collector: db_name and table_name are required")
}
if c.DefaultRetentionDays <= 0 {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidConfig, "telemetry meter collector: default_retention_days must be positive")
}
return nil
}

View File

@@ -1,25 +0,0 @@
package metercollector
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/zeustypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
ErrCodeMeterCollectorCollectFailed = errors.MustNewCode("meter_collector_collect_failed")
ErrCodeMeterCollectorInvalidCustomRetentionRule = errors.MustNewCode("meter_collector_invalid_custom_retention_rule")
ErrCodeInvalidConfig = errors.MustNewCode("meter_collector_invalid_config")
)
type MeterCollector interface {
Name() zeustypes.MeterName
Unit() zeustypes.MeterUnit
Aggregation() zeustypes.MeterAggregation
Origin(ctx context.Context, orgID valuer.UUID, license *licensetypes.License, todayStart time.Time) (time.Time, error)
Collect(ctx context.Context, orgID valuer.UUID, license *licensetypes.License, window zeustypes.MeterWindow) ([]zeustypes.Meter, error)
}

View File

@@ -1,37 +0,0 @@
package meterreporter
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
var _ factory.Config = (*Config)(nil)
type Config struct {
// Interval is how often the reporter collects and ships meters.
Interval time.Duration `mapstructure:"interval"`
// Backfill enables sealed-day catch-up from the license creation day.
Backfill bool `mapstructure:"backfill"`
}
func newConfig() factory.Config {
return Config{
Interval: 6 * time.Hour,
Backfill: true,
}
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("meterreporter"), newConfig)
}
func (c Config) Validate() error {
if c.Interval < 5*time.Minute || c.Interval > 24*time.Hour {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be between 5m and 24h")
}
return nil
}

View File

@@ -1,14 +0,0 @@
package meterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
var (
ErrCodeInvalidInput = errors.MustNewCode("meterreporter_invalid_input")
)
type Reporter interface {
factory.ServiceWithHealthy
}

View File

@@ -1,39 +0,0 @@
package noopmeterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/meterreporter"
)
type provider struct {
healthyC chan struct{}
stopC chan struct{}
}
func NewFactory() factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
}
func New(_ context.Context, _ factory.ProviderSettings, _ meterreporter.Config) (meterreporter.Reporter, error) {
return &provider{
healthyC: make(chan struct{}),
stopC: make(chan struct{}),
}, nil
}
func (p *provider) Start(_ context.Context) error {
close(p.healthyC)
<-p.stopC
return nil
}
func (p *provider) Stop(_ context.Context) error {
close(p.stopC)
return nil
}
func (p *provider) Healthy() <-chan struct{} {
return p.healthyC
}

View File

@@ -1,52 +0,0 @@
package implretention
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/retention"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type getter struct {
store retentiontypes.Store
}
// NewGetter creates a retention getter backed by the retention store.
func NewGetter(store retentiontypes.Store) retention.Getter {
return &getter{
store: store,
}
}
// GetRetentionPolicySegments loads successful TTL changes and converts them into retention policy segments.
func (getter *getter) GetRetentionPolicySegments(
ctx context.Context,
orgID valuer.UUID,
dbName string,
tableName string,
fallbackDefaultDays int,
startMs int64,
endMs int64,
) ([]*retentiontypes.RetentionPolicySegment, error) {
if startMs >= endMs {
return nil, nil
}
if dbName == "" {
return nil, errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "dbName is empty")
}
if tableName == "" {
return nil, errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "tableName is empty")
}
if fallbackDefaultDays <= 0 {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "non-positive fallbackDefaultDays %d", fallbackDefaultDays)
}
rows, err := getter.store.ListTTLSettingsByTableNameAndBeforeCreatedAt(ctx, orgID, dbName+"."+tableName, endMs)
if err != nil {
return nil, err
}
return retentiontypes.BuildRetentionPolicySegmentsFromRows(rows, fallbackDefaultDays, startMs, endMs)
}

View File

@@ -1,41 +0,0 @@
package implretention
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
// NewStore creates a SQL-backed retention store.
func NewStore(sqlstore sqlstore.SQLStore) retentiontypes.Store {
return &store{sqlstore: sqlstore}
}
// ListTTLSettingsByTableNameAndBeforeCreatedAt returns successful TTL settings before the given timestamp.
func (store *store) ListTTLSettingsByTableNameAndBeforeCreatedAt(ctx context.Context, orgID valuer.UUID, tableName string, beforeMs int64) ([]*retentiontypes.TTLSetting, error) {
rows := []*retentiontypes.TTLSetting{}
err := store.
sqlstore.
BunDB().
NewSelect().
Model(&rows).
Where("table_name = ?", tableName).
Where("org_id = ?", orgID.StringValue()).
Where("status = ?", retentiontypes.TTLSettingStatusSuccess).
Where("created_at < ?", time.UnixMilli(beforeMs).UTC()).
OrderExpr("created_at ASC").
Scan(ctx)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "load ttl_setting rows for org %q table %q", orgID.StringValue(), tableName)
}
return rows, nil
}

View File

@@ -1,14 +0,0 @@
package retention
import (
"context"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Getter resolves retention data and expressions for read paths.
type Getter interface {
// GetRetentionPolicySegments returns retention policy segments active over a half-open meter window.
GetRetentionPolicySegments(ctx context.Context, orgID valuer.UUID, dbName string, tableName string, fallbackDefaultDays int, startMs int64, endMs int64) ([]*retentiontypes.RetentionPolicySegment, error)
}

View File

@@ -1343,7 +1343,7 @@ func getLocalTableName(tableName string) string {
}
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -1378,7 +1378,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
if apiErr != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
@@ -1437,7 +1437,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: retentiontypes.TTLSettingStatusPending,
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
@@ -1463,7 +1463,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1483,7 +1483,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1498,7 +1498,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1508,10 +1508,10 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
}
}(ttlPayload)
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalTraces.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -1541,7 +1541,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
if apiErr != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
@@ -1575,7 +1575,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: retentiontypes.TTLSettingStatusPending,
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
@@ -1613,7 +1613,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1634,7 +1634,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1649,7 +1649,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1658,7 +1658,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
}
}(distributedTableName)
}
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool, error) {
@@ -1687,7 +1687,7 @@ func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool,
return true, nil
}
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *retentiontypes.CustomRetentionTTLParams) (*retentiontypes.CustomRetentionTTLResponse, error) {
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
@@ -1702,7 +1702,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *r
if !hasCustomRetention {
r.logger.Info("Custom retention not supported, falling back to standard TTL method", "orgID", orgID)
ttlParams := &retentiontypes.TTLParams{
ttlParams := &model.TTLParams{
Type: params.Type,
DelDuration: int64(params.DefaultTTLDays * 24 * 3600),
}
@@ -1723,7 +1723,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *r
return nil, errorsV2.Wrapf(apiErr.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to set standard TTL")
}
return &retentiontypes.CustomRetentionTTLResponse{
return &model.CustomRetentionTTLResponse{
Message: fmt.Sprintf("Custom retention not supported, applied standard TTL of %d days. %s", params.DefaultTTLDays, ttlResult.Message),
}, nil
}
@@ -1734,7 +1734,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *r
uuidWithHyphen := valuer.GenerateUUID()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
if params.Type != retentiontypes.LogsTTL {
if params.Type != constants.LogsTTL {
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL only supported for logs")
}
@@ -1765,7 +1765,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *r
if apiErr != nil {
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query")
}
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
if statusItem.Status == constants.StatusPending {
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL is already running")
}
}
@@ -1851,7 +1851,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *r
TableName: tableName,
TTL: params.DefaultTTLDays,
Condition: string(ttlConditionsJSON),
Status: retentiontypes.TTLSettingStatusPending,
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
@@ -1867,7 +1867,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *r
err := r.setColdStorage(ctx, tableName, params.ColdStorageVolume)
if err != nil {
r.logger.Error("error in setting cold storage", errorsV2.Attr(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusFailed)
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
return nil, errorsV2.Wrapf(err.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting cold storage for table %s", tableName)
}
}
@@ -1876,21 +1876,21 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *r
r.logger.Debug("Executing custom retention TTL request: ", "request", query, "step", i+1)
if err := r.db.Exec(ctx, query); err != nil {
r.logger.Error("error while setting custom retention ttl", errorsV2.Attr(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusFailed)
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s, query: %s", tableName, query)
}
}
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusSuccess)
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
}
return &retentiontypes.CustomRetentionTTLResponse{
return &model.CustomRetentionTTLResponse{
Message: "custom retention TTL has been successfully set up",
}, nil
}
// New method to build multiIf expressions with support for multiple AND conditions
func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []retentiontypes.CustomRetentionRule, defaultTTLDays int, isResourceTable bool) string {
func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []model.CustomRetentionRule, defaultTTLDays int, isResourceTable bool) string {
var conditions []string
for i, rule := range ttlConditions {
@@ -1962,7 +1962,7 @@ func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []retentiontypes
return result
}
func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID string) (*retentiontypes.GetCustomRetentionTTLResponse, error) {
func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error) {
// Check if V2 (custom retention) is supported
hasCustomRetention, err := r.hasCustomRetentionColumn(ctx)
if err != nil {
@@ -1971,7 +1971,7 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
hasCustomRetention = false
}
response := &retentiontypes.GetCustomRetentionTTLResponse{}
response := &model.GetCustomRetentionTTLResponse{}
if hasCustomRetention {
// V2 - Custom retention is supported
@@ -1994,19 +1994,19 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
if err == sql.ErrNoRows {
// No V2 configuration found, return defaults
response.DefaultTTLDays = retentiontypes.DefaultLogsRetentionDays
response.TTLConditions = []retentiontypes.CustomRetentionRule{}
response.Status = retentiontypes.TTLSettingStatusSuccess
response.DefaultTTLDays = 15
response.TTLConditions = []model.CustomRetentionRule{}
response.Status = constants.StatusSuccess
response.ColdStorageTTLDays = -1
return response, nil
}
// Parse TTL conditions from Condition
var ttlConditions []retentiontypes.CustomRetentionRule
var ttlConditions []model.CustomRetentionRule
if customTTL.Condition != "" {
if err := json.Unmarshal([]byte(customTTL.Condition), &ttlConditions); err != nil {
r.logger.Error("Error parsing TTL conditions", errorsV2.Attr(err))
ttlConditions = []retentiontypes.CustomRetentionRule{}
ttlConditions = []model.CustomRetentionRule{}
}
}
@@ -2020,8 +2020,8 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
response.Version = "v1"
// Get V1 TTL configuration
ttlParams := &retentiontypes.GetTTLParams{
Type: retentiontypes.LogsTTL,
ttlParams := &model.GetTTLParams{
Type: constants.LogsTTL,
}
ttlResult, apiErr := r.GetTTL(ctx, orgID, ttlParams)
@@ -2041,7 +2041,7 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
}
// For V1, we don't have TTL conditions
response.TTLConditions = []retentiontypes.CustomRetentionRule{}
response.TTLConditions = []model.CustomRetentionRule{}
}
return response, nil
@@ -2081,7 +2081,7 @@ func (r *ClickHouseReader) updateCustomRetentionTTLStatus(ctx context.Context, o
}
// Enhanced validation function with duplicate detection and efficient key validation
func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditions []retentiontypes.CustomRetentionRule) error {
func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditions []model.CustomRetentionRule) error {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "clickhouse-reader",
instrumentationtypes.CodeFunctionName: "validateTTLConditions",
@@ -2185,16 +2185,16 @@ func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditi
// SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db.
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, orgID, 100)
switch params.Type {
case retentiontypes.TraceTTL:
case constants.TraceTTL:
return r.setTTLTraces(ctx, orgID, params)
case retentiontypes.MetricsTTL:
case constants.MetricsTTL:
return r.setTTLMetrics(ctx, orgID, params)
case retentiontypes.LogsTTL:
case constants.LogsTTL:
return r.setTTLLogs(ctx, orgID, params)
default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", params.Type)}
@@ -2202,7 +2202,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *ret
}
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalMetrics.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -2231,7 +2231,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
if apiErr != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
@@ -2247,7 +2247,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: retentiontypes.TTLSettingStatusPending,
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
@@ -2285,7 +2285,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -2306,7 +2306,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -2321,7 +2321,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
NewUpdate().
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -2332,7 +2332,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
for _, tableName := range tableNames {
go metricTTL(tableName)
}
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID string, numberOfTransactionsStore int) {
@@ -2389,7 +2389,7 @@ func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string,
// getTTLQueryStatus fetches ttl_status table status from DB
func (r *ClickHouseReader) getTTLQueryStatus(ctx context.Context, orgID string, tableNameArray []string) (string, *model.ApiError) {
failFlag := false
status := retentiontypes.TTLSettingStatusSuccess
status := constants.StatusSuccess
for _, tableName := range tableNameArray {
statusItem, apiErr := r.checkTTLStatusItem(ctx, orgID, tableName)
emptyStatusStruct := new(retentiontypes.TTLSetting)
@@ -2399,16 +2399,16 @@ func (r *ClickHouseReader) getTTLQueryStatus(ctx context.Context, orgID string,
if apiErr != nil {
return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == retentiontypes.TTLSettingStatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
status = retentiontypes.TTLSettingStatusPending
if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
status = constants.StatusPending
return status, nil
}
if statusItem.Status == retentiontypes.TTLSettingStatusFailed {
if statusItem.Status == constants.StatusFailed {
failFlag = true
}
}
if failFlag {
status = retentiontypes.TTLSettingStatusFailed
status = constants.StatusFailed
}
return status, nil
@@ -2461,7 +2461,7 @@ func getLocalTableNameArray(tableNames []string) []string {
}
// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces.
func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.GetTTLParams) (*retentiontypes.GetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -2496,8 +2496,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
return delTTL, moveTTL
}
getMetricsTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
var dbResp []retentiontypes.DBResponseTTL
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleLocalTableName)
@@ -2514,8 +2514,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
}
getTracesTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
var dbResp []retentiontypes.DBResponseTTL
getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.traceLocalTableName, signozTraceDBName)
@@ -2532,8 +2532,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
}
getLogsTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
var dbResp []retentiontypes.DBResponseTTL
getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTableName, r.logsDB)
@@ -2551,7 +2551,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
switch ttlParams.Type {
case retentiontypes.TraceTTL:
case constants.TraceTTL:
tableNameArray := []string{
r.TraceDB + "." + r.traceTableName,
r.TraceDB + "." + r.traceResourceTableV3,
@@ -2579,9 +2579,9 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &retentiontypes.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
case retentiontypes.MetricsTTL:
case constants.MetricsTTL:
tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName}
tableNameArray = getLocalTableNameArray(tableNameArray)
status, apiErr := r.getTTLQueryStatus(ctx, orgID, tableNameArray)
@@ -2602,9 +2602,9 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &retentiontypes.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
case retentiontypes.LogsTTL:
case constants.LogsTTL:
tableNameArray := []string{r.logsDB + "." + r.logsTableName}
tableNameArray = getLocalTableNameArray(tableNameArray)
status, apiErr := r.getTTLQueryStatus(ctx, orgID, tableNameArray)
@@ -2625,7 +2625,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &retentiontypes.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",

View File

@@ -34,7 +34,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
@@ -1678,7 +1677,7 @@ func (aH *APIHandler) setCustomRetentionTTL(w http.ResponseWriter, r *http.Reque
return
}
var params retentiontypes.CustomRetentionTTLParams
var params model.CustomRetentionTTLParams
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "Invalid data"))
return

View File

@@ -40,7 +40,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/query-service/utils"
querytemplate "github.com/SigNoz/signoz/pkg/query-service/utils/queryTemplate"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
chVariables "github.com/SigNoz/signoz/pkg/variables/clickhouse"
)
@@ -420,7 +419,7 @@ func parseTime(param string, r *http.Request) (*time.Time, error) {
}
func parseTTLParams(r *http.Request) (*retentiontypes.TTLParams, error) {
func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
// make sure either of the query params are present
typeTTL := r.URL.Query().Get("type")
@@ -433,7 +432,7 @@ func parseTTLParams(r *http.Request) (*retentiontypes.TTLParams, error) {
}
// Validate the type parameter
if typeTTL != retentiontypes.TraceTTL && typeTTL != retentiontypes.MetricsTTL && typeTTL != retentiontypes.LogsTTL {
if typeTTL != baseconstants.TraceTTL && typeTTL != baseconstants.MetricsTTL && typeTTL != baseconstants.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
}
@@ -456,7 +455,7 @@ func parseTTLParams(r *http.Request) (*retentiontypes.TTLParams, error) {
}
}
return &retentiontypes.TTLParams{
return &model.TTLParams{
Type: typeTTL,
DelDuration: int64(durationParsed.Seconds()),
ColdStorageVolume: coldStorage,
@@ -464,7 +463,7 @@ func parseTTLParams(r *http.Request) (*retentiontypes.TTLParams, error) {
}, nil
}
func parseGetTTL(r *http.Request) (*retentiontypes.GetTTLParams, error) {
func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
typeTTL := r.URL.Query().Get("type")
@@ -472,12 +471,12 @@ func parseGetTTL(r *http.Request) (*retentiontypes.GetTTLParams, error) {
return nil, fmt.Errorf("type param cannot be empty from the query")
} else {
// Validate the type parameter
if typeTTL != retentiontypes.TraceTTL && typeTTL != retentiontypes.MetricsTTL && typeTTL != retentiontypes.LogsTTL {
if typeTTL != baseconstants.TraceTTL && typeTTL != baseconstants.MetricsTTL && typeTTL != baseconstants.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
}
}
return &retentiontypes.GetTTLParams{Type: typeTTL}, nil
return &model.GetTTLParams{Type: typeTTL}, nil
}
func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequest, error) {

View File

@@ -19,6 +19,10 @@ const (
const MaxAllowedPointsInTimeSeries = 300
const TraceTTL = "traces"
const MetricsTTL = "metrics"
const LogsTTL = "logs"
const SpanSearchScopeRoot = "isroot"
const SpanSearchScopeEntryPoint = "isentrypoint"
const OrderBySpanCount = "span_count"

View File

@@ -7,7 +7,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/stats"
@@ -24,8 +23,8 @@ type Reader interface {
GetServicesList(ctx context.Context) (*[]string, error)
GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
GetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.GetTTLParams) (*retentiontypes.GetTTLResponseItem, *model.ApiError)
GetCustomRetentionTTL(ctx context.Context, orgID string) (*retentiontypes.GetCustomRetentionTTLResponse, error)
GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error)
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
// clickhouse only.
@@ -47,8 +46,8 @@ type Reader interface {
GetFlamegraphSpansForTrace(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, error)
// Setter Interfaces
SetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError)
SetTTLV2(ctx context.Context, orgID string, params *retentiontypes.CustomRetentionTTLParams) (*retentiontypes.CustomRetentionTTLResponse, error)
SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error)
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)

View File

@@ -404,6 +404,56 @@ type TagKey struct {
Type TagDataType `json:"type"`
}
type TTLParams struct {
Type string // It can be one of {traces, metrics}.
ColdStorageVolume string // Name of the cold storage volume.
ToColdStorageDuration int64 // Seconds after which data will be moved to cold storage.
DelDuration int64 // Seconds after which data will be deleted.
}
type CustomRetentionTTLParams struct {
Type string `json:"type"`
DefaultTTLDays int `json:"defaultTTLDays"`
TTLConditions []CustomRetentionRule `json:"ttlConditions"`
ColdStorageVolume string `json:"coldStorageVolume,omitempty"`
ToColdStorageDurationDays int64 `json:"coldStorageDurationDays,omitempty"`
}
type CustomRetentionRule struct {
Filters []FilterCondition `json:"conditions"`
TTLDays int `json:"ttlDays"`
}
type FilterCondition struct {
Key string `json:"key"`
Values []string `json:"values"`
}
type GetCustomRetentionTTLResponse struct {
Version string `json:"version"`
Status string `json:"status"`
// V1 fields
// LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
// LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
// V2 fields
DefaultTTLDays int `json:"default_ttl_days,omitempty"`
TTLConditions []CustomRetentionRule `json:"ttl_conditions,omitempty"`
ColdStorageVolume string `json:"cold_storage_volume,omitempty"`
ColdStorageTTLDays int `json:"cold_storage_ttl_days,omitempty"`
}
type CustomRetentionTTLResponse struct {
Message string `json:"message"`
}
type GetTTLParams struct {
Type string
}
type ListErrorsParams struct {
StartStr string `json:"start"`
EndStr string `json:"end"`

View File

@@ -150,6 +150,16 @@ type RuleResponseItem struct {
Data string `json:"data" db:"data"`
}
type TTLStatusItem struct {
Id int `json:"id" db:"id"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
TableName string `json:"table_name" db:"table_name"`
TTL int `json:"ttl" db:"ttl"`
Status string `json:"status" db:"status"`
ColdStorageTtl int `json:"cold_storage_ttl" db:"cold_storage_ttl"`
}
type ChannelItem struct {
Id int `json:"id" db:"id"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
@@ -452,11 +462,35 @@ type SpanAggregatesDBResponseItem struct {
GroupBy string `ch:"groupBy"`
}
type SetTTLResponseItem struct {
Message string `json:"message"`
}
type DiskItem struct {
Name string `json:"name,omitempty" ch:"name"`
Type string `json:"type,omitempty" ch:"type"`
}
type DBResponseTTL struct {
EngineFull string `ch:"engine_full"`
}
type GetTTLResponseItem struct {
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
Status string `json:"status"`
}
type DBResponseServiceName struct {
ServiceName string `ch:"serviceName"`
Count uint64 `ch:"count"`

View File

@@ -23,7 +23,6 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
@@ -136,9 +135,6 @@ type Config struct {
// Auditor config
Auditor auditor.Config `mapstructure:"auditor"`
// MeterReporter config
MeterReporter meterreporter.Config `mapstructure:"meterreporter"`
// CloudIntegration config
CloudIntegration cloudintegration.Config `mapstructure:"cloudintegration"`
@@ -179,7 +175,6 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
identn.NewConfigFactory(),
serviceaccount.NewConfigFactory(),
auditor.NewConfigFactory(),
meterreporter.NewConfigFactory(),
cloudintegration.NewConfigFactory(),
tracedetail.NewConfigFactory(),
authz.NewConfigFactory(),

View File

@@ -16,7 +16,6 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/retention/implretention"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/queryparser"
@@ -53,8 +52,7 @@ func TestNewHandlers(t *testing.T) {
userRoleStore := impluser.NewUserRoleStore(sqlstore, providerSettings)
userGetter := impluser.NewGetter(impluser.NewStore(sqlstore, providerSettings), userRoleStore, flagger)
retentionGetter := implretention.NewGetter(implretention.NewStore(sqlstore))
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, nil, nil, retentionGetter, flagger)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, nil, nil, flagger)
querierHandler := querier.NewHandler(providerSettings, nil, nil)
registryHandler := factory.NewHandler(nil)

View File

@@ -29,7 +29,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/quickfilter/implquickfilter"
"github.com/SigNoz/signoz/pkg/modules/rawdataexport"
"github.com/SigNoz/signoz/pkg/modules/rawdataexport/implrawdataexport"
"github.com/SigNoz/signoz/pkg/modules/retention"
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory/implrulestatehistory"
"github.com/SigNoz/signoz/pkg/modules/savedview"
@@ -64,7 +63,6 @@ type Modules struct {
Preference preference.Module
UserSetter user.Setter
UserGetter user.Getter
RetentionGetter retention.Getter
SavedView savedview.Module
Apdex apdex.Module
Dashboard dashboard.Module
@@ -105,7 +103,6 @@ func NewModules(
userRoleStore authtypes.UserRoleStore,
serviceAccount serviceaccount.Module,
cloudIntegrationModule cloudintegration.Module,
retentionGetter retention.Getter,
fl flagger.Flagger,
) Modules {
quickfilter := implquickfilter.NewModule(implquickfilter.NewStore(sqlstore))
@@ -122,7 +119,6 @@ func NewModules(
Dashboard: dashboard,
UserSetter: userSetter,
UserGetter: userGetter,
RetentionGetter: retentionGetter,
QuickFilter: quickfilter,
TraceFunnel: impltracefunnel.NewModule(impltracefunnel.NewStore(sqlstore)),
RawDataExport: implrawdataexport.NewModule(querier),

View File

@@ -16,7 +16,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard/impldashboard"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/retention/implretention"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
@@ -57,8 +56,7 @@ func TestNewModules(t *testing.T) {
serviceAccount := implserviceaccount.NewModule(implserviceaccount.NewStore(sqlstore), nil, nil, nil, providerSettings, serviceaccount.Config{})
retentionGetter := implretention.NewGetter(implretention.NewStore(sqlstore))
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, serviceAccount, implcloudintegration.NewModule(), retentionGetter, flagger)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, nil, nil, nil, nil, nil, nil, nil, queryParser, Config{}, dashboardModule, userGetter, userRoleStore, serviceAccount, implcloudintegration.NewModule(), flagger)
reflectVal := reflect.ValueOf(modules)
for i := 0; i < reflectVal.NumField(); i++ {

View File

@@ -28,8 +28,6 @@ import (
"github.com/SigNoz/signoz/pkg/identn/apikeyidentn"
"github.com/SigNoz/signoz/pkg/identn/impersonationidentn"
"github.com/SigNoz/signoz/pkg/identn/tokenizeridentn"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/meterreporter/noopmeterreporter"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
@@ -321,12 +319,6 @@ func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[audi
)
}
func NewMeterReporterProviderFactories() factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
return factory.MustNewNamedMap(
noopmeterreporter.NewFactory(),
)
}
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
return factory.MustNewNamedMap(
configflagger.NewFactory(registry),

View File

@@ -22,13 +22,10 @@ import (
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/retention"
"github.com/SigNoz/signoz/pkg/modules/retention/implretention"
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
@@ -87,7 +84,6 @@ type SigNoz struct {
Flagger flagger.Flagger
Gateway gateway.Gateway
Auditor auditor.Auditor
MeterReporter meterreporter.Reporter
}
func New(
@@ -108,7 +104,6 @@ func New(
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
meterReporterProviderFactories func(context.Context, factory.ProviderSettings, flagger.Flagger, licensing.Licensing, telemetrystore.TelemetryStore, retention.Getter, organization.Getter, zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string),
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
cloudIntegrationCallback func(sqlstore.SQLStore, global.Global, zeus.Zeus, gateway.Gateway, licensing.Licensing, serviceaccount.Module, cloudintegration.Config) (cloudintegration.Module, error),
rulerProviderFactories func(cache.Cache, alertmanager.Alertmanager, sqlstore.SQLStore, telemetrystore.TelemetryStore, telemetrytypes.MetadataStore, prometheus.Prometheus, organization.Getter, rulestatehistory.Module, querier.Querier, queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]],
@@ -233,8 +228,6 @@ func New(
return nil, err
}
retentionGetter := implretention.NewGetter(implretention.NewStore(sqlstore))
// Initialize prometheus from the available prometheus provider factories
prometheus, err := factory.NewProviderFromNamedMap(
ctx,
@@ -393,13 +386,6 @@ func New(
return nil, err
}
// Initialize meter reporter from the variant-specific provider factories
meterReporterFactories, meterReporterProvider := meterReporterProviderFactories(ctx, providerSettings, flagger, licensing, telemetrystore, retentionGetter, orgGetter, zeus)
meterReporter, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.MeterReporter, meterReporterFactories, meterReporterProvider)
if err != nil {
return nil, err
}
// Initialize authns
store := sqlauthnstore.NewStore(sqlstore)
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
@@ -455,7 +441,7 @@ func New(
}
// Initialize all modules
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache, queryParser, config, dashboard, userGetter, userRoleStore, serviceAccount, cloudIntegrationModule, retentionGetter, flagger)
modules := NewModules(sqlstore, tokenizer, emailing, providerSettings, orgGetter, alertmanager, analytics, querier, telemetrystore, telemetryMetadataStore, authNs, authz, cache, queryParser, config, dashboard, userGetter, userRoleStore, serviceAccount, cloudIntegrationModule, flagger)
// Initialize ruler from the variant-specific provider factories
rulerInstance, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.Ruler, rulerProviderFactories(cache, alertmanager, sqlstore, telemetrystore, telemetryMetadataStore, prometheus, orgGetter, modules.RuleStateHistory, querier, queryParser), "signoz")
@@ -515,7 +501,6 @@ func New(
factory.NewNamedService(factory.MustNewName("authz"), authz),
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
factory.NewNamedService(factory.MustNewName("meterreporter"), meterReporter, factory.MustNewName("licensing")),
factory.NewNamedService(factory.MustNewName("ruler"), rulerInstance),
)
if err != nil {
@@ -565,6 +550,5 @@ func New(
Flagger: flagger,
Gateway: gateway,
Auditor: auditor,
MeterReporter: meterReporter,
}, nil
}

View File

@@ -186,7 +186,7 @@ func (c *conditionBuilder) conditionFor(
column := columns[0]
if len(key.Evolutions) > 0 {
// we will use the corresponding column and its evolution entry for the query
newColumns, _, err := selectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
newColumns, _, err := qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
if err != nil {
return "", err
}

View File

@@ -3,11 +3,7 @@ package telemetrylogs
import (
"context"
"fmt"
"slices"
"sort"
"strconv"
"strings"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/utils"
@@ -137,113 +133,6 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
return nil, qbtypes.ErrColumnNotFound
}
// selectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
// Logic:
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
// - Rejects all evolutions before this latest base evolution
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
// - Results are sorted by ReleaseTime descending (newest first)
func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
copy(sortedEvolutions, evolutions)
// sort the evolutions by ReleaseTime ascending
sort.Slice(sortedEvolutions, func(i, j int) bool {
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
})
tsStartTime := time.Unix(0, int64(tsStart))
tsEndTime := time.Unix(0, int64(tsEnd))
// Build evolution map: column name -> evolution
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
for _, evolution := range sortedEvolutions {
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
// since if there is duplicate we would just use the oldest one.
continue
}
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
}
// Find the latest base evolution (<= tsStartTime) across ALL columns
// Evolutions are sorted, so we can break early
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
for _, evolution := range sortedEvolutions {
if evolution.ReleaseTime.After(tsStartTime) {
break
}
latestBaseEvolutionAcrossAll = evolution
}
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
if latestBaseEvolutionAcrossAll == nil {
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
}
columnLookUpMap := make(map[string]*schema.Column)
for _, column := range columns {
columnLookUpMap[column.Name] = column
}
// Collect column-evolution pairs
type colEvoPair struct {
column *schema.Column
evolution *telemetrytypes.EvolutionEntry
}
pairs := []colEvoPair{}
for _, evolution := range evolutionMap {
// Reject evolutions before the latest base evolution
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
continue
}
// skip evolutions after tsEndTime
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
continue
}
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
}
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
}
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
if len(pairs) == 0 {
for _, column := range columns {
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
}
}
}
// Sort by ReleaseTime descending (newest first)
slices.SortFunc(pairs, func(a, b colEvoPair) int {
// Sort by ReleaseTime descending (newest first)
if a.evolution.ReleaseTime.After(b.evolution.ReleaseTime) {
return -1
}
if a.evolution.ReleaseTime.Before(b.evolution.ReleaseTime) {
return 1
}
return 0
})
// Extract results
newColumns := make([]*schema.Column, len(pairs))
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
for i, pair := range pairs {
newColumns[i] = pair.column
evolutionsEntries[i] = pair.evolution
}
return newColumns, evolutionsEntries, nil
}
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
columns, err := m.getColumn(ctx, key)
if err != nil {
@@ -254,7 +143,7 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
var evolutionsEntries []*telemetrytypes.EvolutionEntry
if len(key.Evolutions) > 0 {
// we will use the corresponding column and its evolution entry for the query
newColumns, evolutionsEntries, err = selectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd)
newColumns, evolutionsEntries, err = qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd)
if err != nil {
return "", err
}

View File

@@ -536,390 +536,6 @@ func TestFieldForWithEvolutions(t *testing.T) {
}
}
func TestSelectEvolutionsForColumns(t *testing.T) {
testCases := []struct {
name string
columns []*schema.Column
evolutions []*telemetrytypes.EvolutionEntry
tsStart uint64
tsEnd uint64
expectedColumns []string // column names
expectedEvols []string // evolution column names
expectedError bool
errorStr string
}{
{
name: "New evolutions at tsStartTime - should include latest evolution",
columns: []*schema.Column{
logsV2Columns["resources_string"],
logsV2Columns["resource"],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 30, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resource"},
expectedEvols: []string{"resource"},
},
{
name: "New evolutions after tsStartTime but less than tsEndTime - should include both",
columns: []*schema.Column{
logsV2Columns["resources_string"],
logsV2Columns["resource"],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 2, 3, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resource", "resources_string"}, // sorted by ReleaseTime desc
expectedEvols: []string{"resource", "resources_string"},
},
{
name: "Columns without matching evolutions - should exclude them",
columns: []*schema.Column{
logsV2Columns["resources_string"],
logsV2Columns["resource"], // no evolution for this
logsV2Columns["attributes_string"], // no evolution for this
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string"},
expectedEvols: []string{"resources_string"},
},
{
name: "New evolutions at tsEndTime - should not include new evolution",
columns: []*schema.Column{
logsV2Columns["resources_string"],
logsV2Columns["resource"],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 2, 30, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 30, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string"},
expectedEvols: []string{"resources_string"},
},
{
name: "New evolutions after tsEndTime - should exclude new",
columns: []*schema.Column{
logsV2Columns["resources_string"],
logsV2Columns["resource"],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string"},
expectedEvols: []string{"resources_string"},
},
{
name: "Empty columns array",
columns: []*schema.Column{},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{},
expectedEvols: []string{},
expectedError: true,
errorStr: "column resources_string not found",
},
{
name: "Duplicate evolutions - should use first encountered (oldest if sorted)",
columns: []*schema.Column{
logsV2Columns["resource"],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 1, 20, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resource"},
expectedEvols: []string{"resource"}, // should use first one (older)
},
{
name: "Genuine Duplicate evolutions with new version- should consider both",
columns: []*schema.Column{
logsV2Columns["resources_string"],
logsV2Columns["resource"],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 2,
ReleaseTime: time.Date(2024, 1, 20, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 1, 16, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string", "resource"},
expectedEvols: []string{"resources_string", "resource"}, // should use first one (older)
},
{
name: "Evolution exactly at tsEndTime",
columns: []*schema.Column{
logsV2Columns["resources_string"],
logsV2Columns["resource"],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
ReleaseTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC), // exactly at tsEnd
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string"}, // resource excluded because After(tsEnd) is true
expectedEvols: []string{"resources_string"},
},
{
name: "Single evolution after tsStartTime - JSON body",
columns: []*schema.Column{
logsV2Columns[LogsV2BodyV2Column],
logsV2Columns[LogsV2BodyPromotedColumn],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: LogsV2BodyV2Column,
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextBody,
FieldName: "__all__",
ReleaseTime: time.Unix(0, 0),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: LogsV2BodyPromotedColumn,
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextBody,
FieldName: "user.name",
ReleaseTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{LogsV2BodyPromotedColumn, LogsV2BodyV2Column}, // sorted by ReleaseTime desc (newest first)
expectedEvols: []string{LogsV2BodyPromotedColumn, LogsV2BodyV2Column},
},
{
name: "No evolution after tsStartTime - JSON body",
columns: []*schema.Column{
logsV2Columns[LogsV2BodyV2Column],
logsV2Columns[LogsV2BodyPromotedColumn],
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: LogsV2BodyV2Column,
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextBody,
FieldName: "__all__",
ReleaseTime: time.Unix(0, 0),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: LogsV2BodyPromotedColumn,
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextBody,
FieldName: "user.name",
ReleaseTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 3, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{LogsV2BodyPromotedColumn},
expectedEvols: []string{LogsV2BodyPromotedColumn},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resultColumns, resultEvols, err := selectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd)
if tc.expectedError {
assert.Contains(t, err.Error(), tc.errorStr)
} else {
require.NoError(t, err)
assert.Equal(t, len(tc.expectedColumns), len(resultColumns), "column count mismatch")
assert.Equal(t, len(tc.expectedEvols), len(resultEvols), "evolution count mismatch")
resultColumnNames := make([]string, len(resultColumns))
for i, col := range resultColumns {
resultColumnNames[i] = col.Name
}
resultEvolNames := make([]string, len(resultEvols))
for i, evol := range resultEvols {
resultEvolNames[i] = evol.ColumnName
}
for i := range tc.expectedColumns {
assert.Equal(t, resultColumnNames[i], tc.expectedColumns[i], "expected column missing: "+tc.expectedColumns[i])
}
for i := range tc.expectedEvols {
assert.Equal(t, resultEvolNames[i], tc.expectedEvols[i], "expected evolution missing: "+tc.expectedEvols[i])
}
// Verify sorting: should be descending by ReleaseTime
for i := 0; i < len(resultEvols)-1; i++ {
assert.True(t, !resultEvols[i].ReleaseTime.Before(resultEvols[i+1].ReleaseTime),
"evolutions should be sorted descending by ReleaseTime")
}
}
})
}
}
func TestFieldForWithMaterialized(t *testing.T) {
ctx := context.Background()

View File

@@ -344,6 +344,11 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
})
}
}
if err = t.updateColumnEvolutionMetadataForKeys(ctx, keys); err != nil {
return nil, false, err
}
return keys, complete, nil
}
@@ -689,7 +694,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
}
}
if _, err := t.updateColumnEvolutionMetadataForKeys(ctx, keys); err != nil {
if err := t.updateColumnEvolutionMetadataForKeys(ctx, keys); err != nil {
return nil, false, err
}
@@ -2370,8 +2375,8 @@ func (k *telemetryMetaStore) fetchEvolutionEntryFromClickHouse(ctx context.Conte
return entries, nil
}
// Get retrieves all evolutions for the given selectors from DB.
func (k *telemetryMetaStore) updateColumnEvolutionMetadataForKeys(ctx context.Context, keysToUpdate []*telemetrytypes.TelemetryFieldKey) (map[string][]*telemetrytypes.EvolutionEntry, error) {
// updateColumnEvolutionMetadataForKeys updates the evolution field for keys.
func (k *telemetryMetaStore) updateColumnEvolutionMetadataForKeys(ctx context.Context, keysToUpdate []*telemetrytypes.TelemetryFieldKey) error {
var metadataKeySelectors []*telemetrytypes.EvolutionSelector
for _, keySelector := range keysToUpdate {
@@ -2385,7 +2390,7 @@ func (k *telemetryMetaStore) updateColumnEvolutionMetadataForKeys(ctx context.Co
evolutions, err := k.fetchEvolutionEntryFromClickHouse(ctx, metadataKeySelectors)
if err != nil {
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to fetch evolution from clickhouse %s", err.Error())
return errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to fetch evolution from clickhouse %s", err.Error())
}
evolutionsByUniqueKey := make(map[string][]*telemetrytypes.EvolutionEntry)
@@ -2416,7 +2421,7 @@ func (k *telemetryMetaStore) updateColumnEvolutionMetadataForKeys(ctx context.Co
}
}
}
return evolutionsByUniqueKey, nil
return nil
}
// chunkSizeFirstSeenMetricMetadata limits the number of tuples per SQL query to avoid hitting the max_query_size limit.

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetryaudit"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
@@ -17,7 +18,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/assert"
"github.com/SigNoz/signoz/pkg/flagger/flaggertest"
"github.com/stretchr/testify/require"
)
@@ -89,6 +89,19 @@ func TestGetKeys(t *testing.T) {
{Name: "tag_data_type", Type: "String"},
{Name: "priority", Type: "UInt8"},
}, [][]any{{"http.method", "tag", "String", 1}, {"http.method", "tag", "String", 1}}))
mock.ExpectQuery(`FROM signoz_metadata\.distributed_column_evolution_metadata`).
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
{Name: "signal", Type: "String"},
{Name: "column_name", Type: "String"},
{Name: "column_type", Type: "String"},
{Name: "field_context", Type: "String"},
{Name: "field_name", Type: "String"},
{Name: "version", Type: "UInt32"},
{Name: "release_time", Type: "Float64"},
}, [][]any{}))
keys, _, err := metadata.GetKeys(context.Background(), &telemetrytypes.FieldKeySelector{
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
@@ -247,6 +260,27 @@ func TestApplyBackwardCompatibleKeys(t *testing.T) {
}, rows))
}
// getTracesKeys / getLogsKeys both fetch evolution metadata; return an empty
// result so the existing test data flows through unchanged. Each input key
// becomes one selector contributing four bound args.
if hasTraces || hasLogs {
evoArgs := make([]any, 0, len(tt.inputKeys)*4)
for range tt.inputKeys {
evoArgs = append(evoArgs, nil, nil, nil, nil)
}
mock.ExpectQuery(`FROM signoz_metadata\.distributed_column_evolution_metadata`).
WithArgs(evoArgs...).
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
{Name: "signal", Type: "String"},
{Name: "column_name", Type: "String"},
{Name: "column_type", Type: "String"},
{Name: "field_context", Type: "String"},
{Name: "field_name", Type: "String"},
{Name: "version", Type: "UInt32"},
{Name: "release_time", Type: "Float64"},
}, [][]any{}))
}
selectors := []*telemetrytypes.FieldKeySelector{}
for _, key := range tt.inputKeys {
selectors = append(selectors, &telemetrytypes.FieldKeySelector{

View File

@@ -161,7 +161,33 @@ func (c *conditionBuilder) conditionFor(
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
var value any
switch columns[0].Type.GetType() {
column := columns[0]
if len(key.Evolutions) > 0 {
// we will use the corresponding column and its evolution entry for the query
newColumns, _, err := qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
if err != nil {
return "", err
}
if len(newColumns) == 0 {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "no valid evolution found for field %s in the given time range", key.Name)
}
// Multiple columns means fieldExpression is a multiIf returning NULL when none match,
// so a simple null check is sufficient.
if len(newColumns) > 1 {
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(fieldExpression), nil
} else {
return sb.IsNull(fieldExpression), nil
}
}
// otherwise we have to find the correct exist operator based on the column type
column = newColumns[0]
}
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(fieldExpression), nil
@@ -178,7 +204,7 @@ func (c *conditionBuilder) conditionFor(
return sb.E(fieldExpression, value), nil
}
case schema.ColumnTypeEnumLowCardinality:
switch elementType := columns[0].Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
value = ""
if operator == qbtypes.FilterOperatorExists {
@@ -202,14 +228,14 @@ func (c *conditionBuilder) conditionFor(
return sb.E(fieldExpression, value), nil
}
case schema.ColumnTypeEnumMap:
keyType := columns[0].Type.(schema.MapColumnType).KeyType
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, columns[0].Type)
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
}
switch valueType := columns[0].Type.(schema.MapColumnType).ValueType; valueType.GetType() {
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", columns[0].Name, key.Name)
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
if key.Materialized {
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
}
@@ -222,7 +248,7 @@ func (c *conditionBuilder) conditionFor(
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
}
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", columns[0].Type)
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
}
}
return "", nil

View File

@@ -3,6 +3,7 @@ package telemetrytraces
import (
"context"
"testing"
"time"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -14,6 +15,7 @@ import (
func TestConditionFor(t *testing.T) {
ctx := context.Background()
mockEvolution := mockEvolutionData(time.Date(2025, 10, 26, 0, 10, 0, 0, time.UTC))
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
@@ -213,6 +215,7 @@ func TestConditionFor(t *testing.T) {
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: mockEvolution,
},
operator: qbtypes.FilterOperatorExists,
value: nil,
@@ -225,6 +228,7 @@ func TestConditionFor(t *testing.T) {
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: mockEvolution,
},
operator: qbtypes.FilterOperatorNotExists,
value: nil,
@@ -302,3 +306,85 @@ func TestConditionFor(t *testing.T) {
})
}
}
func TestConditionForResourceWithEvolution(t *testing.T) {
ctx := context.Background()
releaseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
evolutions := mockEvolutionData(releaseTime)
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
operator qbtypes.FilterOperator
tsStart uint64
tsEnd uint64
expectedSQL string
}{
{
name: "Exists - window after release - JSON only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: evolutions,
},
operator: qbtypes.FilterOperatorExists,
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedSQL: "WHERE resource.`service.name`::String IS NOT NULL",
},
{
name: "NotExists - window after release - JSON only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: evolutions,
},
operator: qbtypes.FilterOperatorNotExists,
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedSQL: "WHERE resource.`service.name`::String IS NULL",
},
{
name: "Exists - window before release - map only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: evolutions,
},
operator: qbtypes.FilterOperatorExists,
tsStart: uint64(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedSQL: "WHERE mapContains(resources_string, 'service.name') = ?",
},
{
name: "Exists - window straddles release - multiIf null check",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: evolutions,
},
operator: qbtypes.FilterOperatorExists,
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
},
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
t.Run(tc.name, func(t *testing.T) {
cond, err := conditionBuilder.ConditionFor(ctx, tc.tsStart, tc.tsEnd, &tc.key, tc.operator, nil, sb)
require.NoError(t, err)
sb.Where(cond)
sql, _ := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
assert.Contains(t, sql, tc.expectedSQL)
})
}
}

View File

@@ -1,6 +1,8 @@
package telemetrytraces
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
var (
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{

View File

@@ -174,7 +174,7 @@ func (m *defaultFieldMapper) getColumn(
) ([]*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
return []*schema.Column{indexV3Columns["resource"]}, nil
return []*schema.Column{indexV3Columns["resources_string"], indexV3Columns["resource"]}, nil
case telemetrytypes.FieldContextScope:
return []*schema.Column{}, qbtypes.ErrColumnNotFound
case telemetrytypes.FieldContextAttribute:
@@ -254,63 +254,92 @@ func (m *defaultFieldMapper) FieldFor(
if err != nil {
return "", err
}
if len(columns) != 1 {
return "", errors.Newf(errors.TypeInternal, errors.CodeInternal, "expected exactly 1 column, got %d", len(columns))
var newColumns []*schema.Column
var evolutionsEntries []*telemetrytypes.EvolutionEntry
if len(key.Evolutions) > 0 {
// we will use the corresponding column and its evolution entry for the query
newColumns, evolutionsEntries, err = qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
if err != nil {
return "", err
}
} else {
newColumns = columns
}
column := columns[0]
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
// json is only supported for resource context as of now
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
}
oldColumn := indexV3Columns["resources_string"]
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
// once clickHouse dependency is updated, we need to check if we can remove it.
if key.Materialized {
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
} else {
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
}
case schema.ColumnTypeEnumString,
schema.ColumnTypeEnumUInt64,
schema.ColumnTypeEnumUInt32,
schema.ColumnTypeEnumInt8,
schema.ColumnTypeEnumInt16,
schema.ColumnTypeEnumBool,
schema.ColumnTypeEnumDateTime64,
schema.ColumnTypeEnumFixedString:
return column.Name, nil
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
return column.Name, nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
}
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
exprs := []string{}
existExpr := []string{}
for i, column := range newColumns {
// Use evolution column name if available, otherwise use the column name
columnName := column.Name
if evolutionsEntries != nil && evolutionsEntries[i] != nil {
columnName = evolutionsEntries[i].ColumnName
}
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
// a key could have been materialized, if so return the materialized column name
if key.Materialized {
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
// json is only supported for resource context as of now
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
}
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
// once clickHouse dependency is updated, we need to check if we can remove it.
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
case schema.ColumnTypeEnumString,
schema.ColumnTypeEnumUInt64,
schema.ColumnTypeEnumUInt32,
schema.ColumnTypeEnumInt8,
schema.ColumnTypeEnumInt16,
schema.ColumnTypeEnumBool,
schema.ColumnTypeEnumDateTime64,
schema.ColumnTypeEnumFixedString:
exprs = append(exprs, column.Name)
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
exprs = append(exprs, column.Name)
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
}
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
}
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
// a key could have been materialized, if so return the materialized column name
if key.Materialized {
exprs = append(exprs, telemetrytypes.FieldKeyToMaterializedColumnName(key))
existExpr = append(existExpr, fmt.Sprintf("%s==true", telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)))
} else {
exprs = append(exprs, fmt.Sprintf("%s['%s']", columnName, key.Name))
existExpr = append(existExpr, fmt.Sprintf("mapContains(%s, '%s')", columnName, key.Name))
}
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
}
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
}
}
if len(exprs) == 1 {
return exprs[0], nil
} else if len(exprs) > 1 {
// Ensure existExpr has the same length as exprs
if len(existExpr) != len(exprs) {
return "", errors.New(errors.TypeInternal, errors.CodeInternal, "length of exist exprs doesn't match to that of exprs")
}
finalExprs := []string{}
for i, expr := range exprs {
finalExprs = append(finalExprs, fmt.Sprintf("%s, %s", existExpr[i], expr))
}
return "multiIf(" + strings.Join(finalExprs, ", ") + ", NULL)", nil
}
// should not reach here
return column.Name, nil
return columns[0].Name, nil
}
// ColumnExpressionFor returns the column expression for the given field

View File

@@ -3,6 +3,7 @@ package telemetrytraces
import (
"context"
"testing"
"time"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -13,6 +14,7 @@ import (
func TestGetFieldKeyName(t *testing.T) {
ctx := context.Background()
mockEvolution := mockEvolutionData(time.Date(2024, 6, 2, 0, 0, 0, 0, time.UTC))
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
@@ -63,6 +65,7 @@ func TestGetFieldKeyName(t *testing.T) {
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
Evolutions: mockEvolution,
},
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
expectedError: nil,
@@ -74,6 +77,7 @@ func TestGetFieldKeyName(t *testing.T) {
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
Evolutions: mockEvolution,
},
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
expectedError: nil,
@@ -92,7 +96,7 @@ func TestGetFieldKeyName(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
result, err := fm.FieldFor(ctx, 0, 0, &tc.key)
result, err := fm.FieldFor(ctx, uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()), uint64(time.Date(2024, 6, 5, 0, 0, 0, 0, time.UTC).UnixNano()), &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
@@ -103,3 +107,86 @@ func TestGetFieldKeyName(t *testing.T) {
})
}
}
func TestFieldForResourceWithEvolution(t *testing.T) {
ctx := context.Background()
releaseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
evolutions := mockEvolutionData(releaseTime)
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
tsStart uint64
tsEnd uint64
expectedResult string
}{
{
name: "Window straddles release - both columns",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
},
{
name: "Window fully after release - JSON column only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "resource.`service.name`::String",
},
{
name: "Window fully before release - map column only",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "resources_string['service.name']",
},
{
name: "Window fully after release - materialized resource",
key: telemetrytypes.TelemetryFieldKey{
Name: "deployment.environment",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "resource.`deployment.environment`::String",
},
{
name: "Window straddles release - materialized resource",
key: telemetrytypes.TelemetryFieldKey{
Name: "deployment.environment",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
Evolutions: evolutions,
},
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
require.NoError(t, err)
assert.Equal(t, tc.expectedResult, result)
})
}
}

View File

@@ -82,13 +82,6 @@ func (b *traceQueryStatementBuilder) Build(
start = querybuilder.ToNanoSecs(start)
end = querybuilder.ToNanoSecs(end)
keySelectors := getKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
/*
Adding a tech debt note here:
This piece of code is a hot fix and should be removed once we close issue: engineering-pod/issues/3622
@@ -124,6 +117,14 @@ func (b *traceQueryStatementBuilder) Build(
-------------------------------- End of tech debt ----------------------------
*/
// since we are modifying the selectFields, they might include keys which need evolutions so we should get keys after that.
keySelectors := getKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
query = b.adjustKeys(ctx, keys, query, requestType)
// Create SQL builder

View File

@@ -16,6 +16,9 @@ import (
)
func TestStatementBuilder(t *testing.T) {
// releaseTime is chosen so it lands inside the standard [1747947419000, 1747983448000]ms
// test window, keeping the multiIf SQL form for resource fields.
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -355,7 +358,7 @@ func TestStatementBuilder(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -394,6 +397,7 @@ func TestStatementBuilder(t *testing.T) {
}
func TestStatementBuilderListQuery(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -650,7 +654,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -683,6 +687,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
}
func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -703,6 +708,15 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"service.name": {
{
Name: "service.name",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: mockEvolutionData(time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)),
},
},
},
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
@@ -728,6 +742,15 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"service.name": {
{
Name: "service.name",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Evolutions: mockEvolutionData(time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)),
},
},
},
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
@@ -758,7 +781,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = c.keysMap
if mockMetadataStore.KeysMap == nil {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
}
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -788,7 +811,90 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
}
}
func TestStatementBuilderGroupByResourceEvolution(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
startMs uint64
endMs uint64
expected qbtypes.Statement
}{
{
name: "window straddles release - both JSON and map branches",
startMs: 1747947419000, // 2025-05-22 21:56:59 UTC, ~3m before release
endMs: 1747983448000, // 2025-05-23 07:57:28 UTC, ~10h after release
expected: qbtypes.Statement{
Query: "WITH __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
},
{
name: "window after release - JSON column only",
startMs: 1747960000000, // 2025-05-23 00:26:40 UTC, ~2.5h after release
endMs: 1747983448000, // 2025-05-23 07:57:28 UTC
expected: qbtypes.Statement{
Query: "WITH __limit_cte AS (SELECT toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"1747960000000000000", "1747983448000000000", uint64(1747958200), uint64(1747983448), 10, "1747960000000000000", "1747983448000000000", uint64(1747958200), uint64(1747983448)},
},
},
{
name: "window before release - map column only",
startMs: 1747900000000, // 2025-05-22 08:26:40 UTC, ~13.5h before release
endMs: 1747947000000, // 2025-05-22 21:50:00 UTC, ~10m before release
expected: qbtypes.Statement{
Query: "WITH __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{true, "1747900000000000000", "1747947000000000000", uint64(1747898200), uint64(1747947000), 10, true, "1747900000000000000", "1747947000000000000", uint64(1747898200), uint64(1747947000)},
},
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
aggExprRewriter,
nil,
fl,
)
query := qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.TraceAggregation{
{Expression: "count()"},
},
Filter: &qbtypes.Filter{},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), c.startMs, c.endMs, qbtypes.RequestTypeTimeSeries, query, nil)
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
})
}
}
func TestStatementBuilderTraceQuery(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -911,7 +1017,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -944,6 +1050,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
}
func TestAdjustKey(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
inputKey telemetrytypes.TelemetryFieldKey
@@ -957,7 +1064,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: IntrinsicFields["trace_id"],
},
{
@@ -967,7 +1074,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextBody, // incorrect context
FieldDataType: telemetrytypes.FieldDataTypeInt64,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "duration_nano",
FieldContext: telemetrytypes.FieldContextSpan, // should be corrected
@@ -981,7 +1088,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextSpan, // correct context
FieldDataType: telemetrytypes.FieldDataTypeInt64,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "duration_nano",
FieldContext: telemetrytypes.FieldContextSpan, // should be corrected
@@ -995,8 +1102,8 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
expectedKey: *buildCompleteFieldKeyMap()["service.name"][0],
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: *buildCompleteFieldKeyMap(releaseTime)["service.name"][0],
},
{
name: "single matching key with context specified - override",
@@ -1005,8 +1112,8 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
expectedKey: *buildCompleteFieldKeyMap()["cart.items_count"][0],
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: *buildCompleteFieldKeyMap(releaseTime)["cart.items_count"][0],
},
{
name: "multiple matching keys - all materialized",
@@ -1043,7 +1150,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "mixed.materialization.key",
FieldDataType: telemetrytypes.FieldDataTypeString,
@@ -1057,7 +1164,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "mixed.materialization.key",
FieldContext: telemetrytypes.FieldContextAttribute,
@@ -1072,7 +1179,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "unknown.field",
Materialized: false,
@@ -1085,7 +1192,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextAttribute,
@@ -1100,7 +1207,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "cart.items_count",
FieldContext: telemetrytypes.FieldContextAttribute,
@@ -1115,7 +1222,7 @@ func TestAdjustKey(t *testing.T) {
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedKey: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
@@ -1158,6 +1265,7 @@ func TestAdjustKey(t *testing.T) {
}
func TestAdjustKeys(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
@@ -1183,7 +1291,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedSelectFields: []telemetrytypes.TelemetryFieldKey{
{
Name: "service.name",
@@ -1220,7 +1328,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedGroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
@@ -1267,7 +1375,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedOrder: []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{
@@ -1326,7 +1434,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
expectedSelectFields: []telemetrytypes.TelemetryFieldKey{
{
Name: "trace_id",
@@ -1381,7 +1489,7 @@ func TestAdjustKeys(t *testing.T) {
},
},
},
keysMap: buildCompleteFieldKeyMap(),
keysMap: buildCompleteFieldKeyMap(releaseTime),
// After alias adjustment, name becomes "span.duration" with FieldContextUnspecified
// "span.duration" is not in keysMap, so context stays unspecified
expectedOrder: []qbtypes.OrderBy{

View File

@@ -1,10 +1,12 @@
package telemetrytraces
import (
"time"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
func buildCompleteFieldKeyMap(releaseTime time.Time) map[string][]*telemetrytypes.TelemetryFieldKey {
keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{
"service.name": {
{
@@ -115,7 +117,33 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
for _, keys := range keysMap {
for _, key := range keys {
key.Signal = telemetrytypes.SignalTraces
if key.FieldContext == telemetrytypes.FieldContextResource {
key.Evolutions = mockEvolutionData(releaseTime)
}
}
}
return keysMap
}
// mockEvolutionData returns the canonical resource-column evolution timeline used in tests:
// the legacy resources_string map at epoch 0 and the JSON resource column released at releaseTime.
func mockEvolutionData(releaseTime time.Time) []*telemetrytypes.EvolutionEntry {
return []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalTraces,
ColumnName: "resources_string",
FieldContext: telemetrytypes.FieldContextResource,
ColumnType: "Map(LowCardinality(String), String)",
FieldName: "__all__",
ReleaseTime: time.Unix(0, 0),
},
{
Signal: telemetrytypes.SignalTraces,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
ReleaseTime: releaseTime,
},
}
}

View File

@@ -398,21 +398,27 @@ func (b *traceOperatorCTEBuilder) buildNotCTE(leftCTE, rightCTE string) (string,
}
func (b *traceOperatorCTEBuilder) buildFinalQuery(ctx context.Context, selectFromCTE string, requestType qbtypes.RequestType) (*qbtypes.Statement, error) {
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, b.getKeySelectors())
if err != nil {
return nil, err
}
b.adjustKeys(keys)
switch requestType {
case qbtypes.RequestTypeRaw:
return b.buildListQuery(ctx, selectFromCTE)
return b.buildListQuery(ctx, selectFromCTE, keys)
case qbtypes.RequestTypeTimeSeries:
return b.buildTimeSeriesQuery(ctx, selectFromCTE)
return b.buildTimeSeriesQuery(ctx, selectFromCTE, keys)
case qbtypes.RequestTypeTrace:
return b.buildTraceQuery(ctx, selectFromCTE)
return b.buildTraceQuery(ctx, selectFromCTE, keys)
case qbtypes.RequestTypeScalar:
return b.buildScalarQuery(ctx, selectFromCTE)
return b.buildScalarQuery(ctx, selectFromCTE, keys)
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
}
}
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
sb := sqlbuilder.NewSelectBuilder()
// Select core fields
@@ -434,22 +440,6 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
"parent_span_id": true,
}
// Get keys for selectFields
keySelectors := b.getKeySelectors()
for _, field := range b.operator.SelectFields {
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: field.Name,
Signal: telemetrytypes.SignalTraces,
FieldContext: field.FieldContext,
FieldDataType: field.FieldDataType,
})
}
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
// Add selectFields using ColumnExpressionFor since we now have all base table columns
for _, field := range b.operator.SelectFields {
if selectedFields[field.Name] {
@@ -526,6 +516,15 @@ func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySe
})
}
for _, field := range b.operator.SelectFields {
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: field.Name,
Signal: telemetrytypes.SignalTraces,
FieldContext: field.FieldContext,
FieldDataType: field.FieldDataType,
})
}
for i := range keySelectors {
keySelectors[i].Signal = telemetrytypes.SignalTraces
}
@@ -533,7 +532,7 @@ func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySe
return keySelectors
}
func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select(fmt.Sprintf(
@@ -541,12 +540,6 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
int64(b.operator.StepInterval.Seconds()),
))
keySelectors := b.getKeySelectors()
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
var allGroupByArgs []any
for _, gb := range b.operator.GroupBy {
@@ -625,8 +618,7 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
combinedArgs := append(allGroupByArgs, allAggChArgs...)
// Add HAVING clause if specified
err = b.addHavingClause(sb)
if err != nil {
if err := b.addHavingClause(sb); err != nil {
return nil, err
}
@@ -653,17 +645,11 @@ func (b *traceOperatorCTEBuilder) buildTraceSummaryCTE(selectFromCTE string) {
b.addCTE("trace_summary", sql, args, []string{"all_spans", selectFromCTE})
}
func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
b.buildTraceSummaryCTE(selectFromCTE)
sb := sqlbuilder.NewSelectBuilder()
keySelectors := b.getKeySelectors()
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
var allGroupByArgs []any
for _, gb := range b.operator.GroupBy {
@@ -745,8 +731,7 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
sb.GroupBy(groupByKeys...)
}
err = b.addHavingClause(sb)
if err != nil {
if err := b.addHavingClause(sb); err != nil {
return nil, err
}
@@ -802,15 +787,9 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
}, nil
}
func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
sb := sqlbuilder.NewSelectBuilder()
keySelectors := b.getKeySelectors()
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
var allGroupByArgs []any
for _, gb := range b.operator.GroupBy {
@@ -892,8 +871,7 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
combinedArgs := append(allGroupByArgs, allAggChArgs...)
// Add HAVING clause if specified
err = b.addHavingClause(sb)
if err != nil {
if err := b.addHavingClause(sb); err != nil {
return nil, err
}
@@ -936,3 +914,16 @@ func (b *traceOperatorCTEBuilder) aggOrderBy(k qbtypes.OrderBy) (int, bool) {
}
return 0, false
}
func (b *traceOperatorCTEBuilder) adjustKeys(keys map[string][]*telemetrytypes.TelemetryFieldKey) {
// todo: this needs to be updated w.r.t trace statement builder.
for i := range b.operator.SelectFields {
querybuilder.AdjustKey(&b.operator.SelectFields[i], keys, nil)
}
for i := range b.operator.GroupBy {
querybuilder.AdjustKey(&b.operator.GroupBy[i].TelemetryFieldKey, keys, nil)
}
for i := range b.operator.Order {
querybuilder.AdjustKey(&b.operator.Order[i].Key.TelemetryFieldKey, keys, nil)
}
}

View File

@@ -15,6 +15,7 @@ import (
)
func TestTraceOperatorStatementBuilder(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -390,7 +391,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
@@ -443,6 +444,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
}
func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
cases := []struct {
name string
operator qbtypes.QueryBuilderTraceOperator
@@ -506,7 +508,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)

View File

@@ -4,6 +4,7 @@ import (
"context"
"strings"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
@@ -16,12 +17,13 @@ import (
)
func TestTraceTimeRangeOptimization(t *testing.T) {
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
mockMetadataStore.KeysMap["trace_id"] = []*telemetrytypes.TelemetryFieldKey{{
Name: "trace_id",
FieldContext: telemetrytypes.FieldContextSpan,

View File

@@ -0,0 +1,119 @@
package querybuildertypesv5
import (
"slices"
"sort"
"strconv"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// SelectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
// Logic:
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
// - Rejects all evolutions before this latest base evolution
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
// - Results are sorted by ReleaseTime descending (newest first)
func SelectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
copy(sortedEvolutions, evolutions)
// sort the evolutions by ReleaseTime ascending
sort.Slice(sortedEvolutions, func(i, j int) bool {
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
})
tsStartTime := time.Unix(0, int64(tsStart))
tsEndTime := time.Unix(0, int64(tsEnd))
// Build evolution map: column name -> evolution
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
for _, evolution := range sortedEvolutions {
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
// since if there is duplicate we would just use the oldest one.
continue
}
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
}
// Find the latest base evolution (<= tsStartTime) across ALL columns
// Evolutions are sorted, so we can break early
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
for _, evolution := range sortedEvolutions {
if evolution.ReleaseTime.After(tsStartTime) {
break
}
latestBaseEvolutionAcrossAll = evolution
}
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
if latestBaseEvolutionAcrossAll == nil {
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
}
columnLookUpMap := make(map[string]*schema.Column)
for _, column := range columns {
columnLookUpMap[column.Name] = column
}
// Collect column-evolution pairs
type colEvoPair struct {
column *schema.Column
evolution *telemetrytypes.EvolutionEntry
}
pairs := []colEvoPair{}
for _, evolution := range evolutionMap {
// Reject evolutions before the latest base evolution
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
continue
}
// skip evolutions after tsEndTime
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
continue
}
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
}
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
}
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
if len(pairs) == 0 {
for _, column := range columns {
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
}
}
}
// Sort by ReleaseTime descending (newest first)
slices.SortFunc(pairs, func(a, b colEvoPair) int {
// Sort by ReleaseTime descending (newest first)
if a.evolution.ReleaseTime.After(b.evolution.ReleaseTime) {
return -1
}
if a.evolution.ReleaseTime.Before(b.evolution.ReleaseTime) {
return 1
}
return 0
})
// Extract results
newColumns := make([]*schema.Column, len(pairs))
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
for i, pair := range pairs {
newColumns[i] = pair.column
evolutionsEntries[i] = pair.evolution
}
return newColumns, evolutionsEntries, nil
}

View File

@@ -0,0 +1,414 @@
package querybuildertypesv5
import (
"testing"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
LogsV2BodyV2Column = "body_v2"
LogsV2BodyPromotedColumn = "body_promoted"
)
var (
resources_string = &schema.Column{Name: "resources_string", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}}
resource = &schema.Column{Name: "resource", Type: schema.JSONColumnType{}}
attributes_string = &schema.Column{Name: "attributes_string", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}}
body_v2 = &schema.Column{Name: LogsV2BodyV2Column, Type: schema.JSONColumnType{}}
body_promoted = &schema.Column{Name: LogsV2BodyPromotedColumn, Type: schema.JSONColumnType{}}
)
func TestSelectEvolutionsForColumns(t *testing.T) {
testCases := []struct {
name string
columns []*schema.Column
evolutions []*telemetrytypes.EvolutionEntry
tsStart uint64
tsEnd uint64
expectedColumns []string // column names
expectedEvols []string // evolution column names
expectedError bool
errorStr string
}{
{
name: "New evolutions at tsStartTime - should include latest evolution",
columns: []*schema.Column{
resources_string,
resource,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 30, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resource"},
expectedEvols: []string{"resource"},
},
{
name: "New evolutions after tsStartTime but less than tsEndTime - should include both",
columns: []*schema.Column{
resources_string,
resource,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 2, 3, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resource", "resources_string"}, // sorted by ReleaseTime desc
expectedEvols: []string{"resource", "resources_string"},
},
{
name: "Columns without matching evolutions - should exclude them",
columns: []*schema.Column{
resources_string,
resource, // no evolution for this
attributes_string, // no evolution for this
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string"},
expectedEvols: []string{"resources_string"},
},
{
name: "New evolutions at tsEndTime - should not include new evolution",
columns: []*schema.Column{
resources_string,
resource,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 2, 30, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 30, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string"},
expectedEvols: []string{"resources_string"},
},
{
name: "New evolutions after tsEndTime - should exclude new",
columns: []*schema.Column{
resources_string,
resource,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 2, 25, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string"},
expectedEvols: []string{"resources_string"},
},
{
name: "Empty columns array",
columns: []*schema.Column{},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{},
expectedEvols: []string{},
expectedError: true,
errorStr: "column resources_string not found",
},
{
name: "Duplicate evolutions - should use first encountered (oldest if sorted)",
columns: []*schema.Column{
resource,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 1, 20, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resource"},
expectedEvols: []string{"resource"}, // should use first one (older)
},
{
name: "Genuine Duplicate evolutions with new version- should consider both",
columns: []*schema.Column{
resources_string,
resource,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 0,
ReleaseTime: time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 1,
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
Version: 2,
ReleaseTime: time.Date(2024, 1, 20, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 1, 16, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string", "resource"},
expectedEvols: []string{"resources_string", "resource"}, // should use first one (older)
},
{
name: "Evolution exactly at tsEndTime",
columns: []*schema.Column{
resources_string,
resource,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resources_string",
ColumnType: "Map(LowCardinality(String), String)",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
ReleaseTime: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: "resource",
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextResource,
FieldName: "__all__",
ReleaseTime: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC), // exactly at tsEnd
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{"resources_string"}, // resource excluded because After(tsEnd) is true
expectedEvols: []string{"resources_string"},
},
{
name: "Single evolution after tsStartTime - JSON body",
columns: []*schema.Column{
body_v2,
body_promoted,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: LogsV2BodyV2Column,
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextBody,
FieldName: "__all__",
ReleaseTime: time.Unix(0, 0),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: LogsV2BodyPromotedColumn,
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextBody,
FieldName: "user.name",
ReleaseTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{LogsV2BodyPromotedColumn, LogsV2BodyV2Column}, // sorted by ReleaseTime desc (newest first)
expectedEvols: []string{LogsV2BodyPromotedColumn, LogsV2BodyV2Column},
},
{
name: "No evolution after tsStartTime - JSON body",
columns: []*schema.Column{
body_v2,
body_promoted,
},
evolutions: []*telemetrytypes.EvolutionEntry{
{
Signal: telemetrytypes.SignalLogs,
ColumnName: LogsV2BodyV2Column,
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextBody,
FieldName: "__all__",
ReleaseTime: time.Unix(0, 0),
},
{
Signal: telemetrytypes.SignalLogs,
ColumnName: LogsV2BodyPromotedColumn,
ColumnType: "JSON()",
FieldContext: telemetrytypes.FieldContextBody,
FieldName: "user.name",
ReleaseTime: time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
},
},
tsStart: uint64(time.Date(2024, 2, 3, 0, 0, 0, 0, time.UTC).UnixNano()),
tsEnd: uint64(time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC).UnixNano()),
expectedColumns: []string{LogsV2BodyPromotedColumn},
expectedEvols: []string{LogsV2BodyPromotedColumn},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resultColumns, resultEvols, err := SelectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd)
if tc.expectedError {
assert.Contains(t, err.Error(), tc.errorStr)
} else {
require.NoError(t, err)
assert.Equal(t, len(tc.expectedColumns), len(resultColumns), "column count mismatch")
assert.Equal(t, len(tc.expectedEvols), len(resultEvols), "evolution count mismatch")
resultColumnNames := make([]string, len(resultColumns))
for i, col := range resultColumns {
resultColumnNames[i] = col.Name
}
resultEvolNames := make([]string, len(resultEvols))
for i, evol := range resultEvols {
resultEvolNames[i] = evol.ColumnName
}
for i := range tc.expectedColumns {
assert.Equal(t, resultColumnNames[i], tc.expectedColumns[i], "expected column missing: "+tc.expectedColumns[i])
}
for i := range tc.expectedEvols {
assert.Equal(t, resultEvolNames[i], tc.expectedEvols[i], "expected evolution missing: "+tc.expectedEvols[i])
}
// Verify sorting: should be descending by ReleaseTime
for i := 0; i < len(resultEvols)-1; i++ {
assert.True(t, !resultEvols[i].ReleaseTime.Before(resultEvols[i+1].ReleaseTime),
"evolutions should be sorted descending by ReleaseTime")
}
}
})
}
}

View File

@@ -1,12 +0,0 @@
package retentiontypes
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Store interface {
// ListTTLSettingsByTableNameAndBeforeCreatedAt returns successful TTL settings before the given timestamp.
ListTTLSettingsByTableNameAndBeforeCreatedAt(ctx context.Context, orgID valuer.UUID, tableName string, beforeMs int64) ([]*TTLSetting, error)
}

View File

@@ -1,141 +1,10 @@
package retentiontypes
import (
"encoding/json"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/uptrace/bun"
)
const secondsPerDay = 24 * 60 * 60
const (
DefaultLogsRetentionDays = 15
DefaultMetricsRetentionDays = 30
DefaultTracesRetentionDays = 15
)
const (
TraceTTL = "traces"
MetricsTTL = "metrics"
LogsTTL = "logs"
)
const (
TTLSettingStatusPending = "pending"
TTLSettingStatusFailed = "failed"
TTLSettingStatusSuccess = "success"
)
// RetentionPolicySegment is a half-open time range using one retention policy.
type RetentionPolicySegment struct {
StartMs int64
EndMs int64
Rules []CustomRetentionRule
DefaultDays int
}
// NewRetentionPolicySegment creates a retention policy segment for a half-open time range.
func NewRetentionPolicySegment(startMs int64, endMs int64, rules []CustomRetentionRule, defaultDays int) *RetentionPolicySegment {
return &RetentionPolicySegment{
StartMs: startMs,
EndMs: endMs,
Rules: rules,
DefaultDays: defaultDays,
}
}
// BuildRetentionPolicySegmentsFromRows converts successful TTL settings into retention policy segments.
func BuildRetentionPolicySegmentsFromRows(rows []*TTLSetting, fallbackDefaultDays int, startMs, endMs int64) ([]*RetentionPolicySegment, error) {
if startMs >= endMs {
return nil, nil
}
var activeAtStart *TTLSetting
inWindow := make([]*TTLSetting, 0, len(rows))
for _, row := range rows {
rowMs := row.CreatedAt.UnixMilli()
if rowMs <= startMs {
activeAtStart = row
continue
}
if rowMs >= endMs {
continue
}
inWindow = append(inWindow, row)
}
activeRules, activeDefault, err := parseTTLSetting(activeAtStart, fallbackDefaultDays)
if err != nil {
return nil, err
}
segments := make([]*RetentionPolicySegment, 0, len(inWindow)+1)
cursor := startMs
for _, row := range inWindow {
rowMs := row.CreatedAt.UnixMilli()
if rowMs <= cursor {
activeRules, activeDefault, err = parseTTLSetting(row, fallbackDefaultDays)
if err != nil {
return nil, err
}
continue
}
segments = append(segments, NewRetentionPolicySegment(cursor, rowMs, activeRules, activeDefault))
cursor = rowMs
activeRules, activeDefault, err = parseTTLSetting(row, fallbackDefaultDays)
if err != nil {
return nil, err
}
}
if cursor < endMs {
segments = append(segments, NewRetentionPolicySegment(cursor, endMs, activeRules, activeDefault))
}
return segments, nil
}
func parseTTLSetting(row *TTLSetting, fallbackDefaultDays int) ([]CustomRetentionRule, int, error) {
if row == nil {
return nil, fallbackDefaultDays, nil
}
defaultDays := row.TTL
if row.Condition == "" {
defaultDays = (row.TTL + secondsPerDay - 1) / secondsPerDay
}
if defaultDays <= 0 {
defaultDays = fallbackDefaultDays
}
if row.Condition == "" {
return nil, defaultDays, nil
}
var rules []CustomRetentionRule
if err := json.Unmarshal([]byte(row.Condition), &rules); err != nil {
return nil, 0, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "parse ttl_setting condition for row %q", row.ID.StringValue())
}
return rules, defaultDays, nil
}
// CustomRetentionRule is one custom retention rule as stored in ttl_setting.condition.
// Rules are evaluated in declaration order; the first matching rule wins.
type CustomRetentionRule struct {
Filters []FilterCondition `json:"conditions"`
TTLDays int `json:"ttlDays"`
}
// FilterCondition is one label-key, allowed-values condition inside a retention rule.
type FilterCondition struct {
Key string `json:"key"`
Values []string `json:"values"`
}
type TTLSetting struct {
bun.BaseModel `bun:"table:ttl_setting"`
types.Identifiable
@@ -148,73 +17,3 @@ type TTLSetting struct {
OrgID string `json:"-" bun:"org_id,notnull"`
Condition string `bun:"condition,type:text"`
}
type TTLParams struct {
Type string
ColdStorageVolume string
ToColdStorageDuration int64
DelDuration int64
}
type CustomRetentionTTLParams struct {
Type string `json:"type"`
DefaultTTLDays int `json:"defaultTTLDays"`
TTLConditions []CustomRetentionRule `json:"ttlConditions"`
ColdStorageVolume string `json:"coldStorageVolume,omitempty"`
ToColdStorageDurationDays int64 `json:"coldStorageDurationDays,omitempty"`
}
type GetTTLParams struct {
Type string
}
type GetCustomRetentionTTLResponse struct {
Version string `json:"version"`
Status string `json:"status"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
DefaultTTLDays int `json:"default_ttl_days,omitempty"`
TTLConditions []CustomRetentionRule `json:"ttl_conditions,omitempty"`
ColdStorageVolume string `json:"cold_storage_volume,omitempty"`
ColdStorageTTLDays int `json:"cold_storage_ttl_days,omitempty"`
}
type CustomRetentionTTLResponse struct {
Message string `json:"message"`
}
type TTLStatusItem struct {
Id int `json:"id" db:"id"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
TableName string `json:"table_name" db:"table_name"`
TTL int `json:"ttl" db:"ttl"`
Status string `json:"status" db:"status"`
ColdStorageTtl int `json:"cold_storage_ttl" db:"cold_storage_ttl"`
}
type SetTTLResponseItem struct {
Message string `json:"message"`
}
type DBResponseTTL struct {
EngineFull string `ch:"engine_full"`
}
type GetTTLResponseItem struct {
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
Status string `json:"status"`
}

View File

@@ -1,83 +0,0 @@
package retentiontypes
import (
"encoding/json"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/types"
"github.com/stretchr/testify/require"
)
func TestBuildRetentionPolicySegmentsFromRows(t *testing.T) {
start := time.Date(2026, 5, 4, 0, 0, 0, 0, time.UTC)
end := start.AddDate(0, 0, 1)
ruleA := CustomRetentionRule{
Filters: []FilterCondition{{Key: "service.name", Values: []string{"api"}}},
TTLDays: 7,
}
ruleB := CustomRetentionRule{
Filters: []FilterCondition{{Key: "env", Values: []string{"prod"}}},
TTLDays: 15,
}
t.Run("row before window is active at start", func(t *testing.T) {
segments, err := BuildRetentionPolicySegmentsFromRows(
[]*TTLSetting{
ttlSetting(t, start.Add(-time.Hour), 45, []CustomRetentionRule{ruleA}),
},
30,
start.UnixMilli(),
end.UnixMilli(),
)
require.NoError(t, err)
require.Equal(t, []*RetentionPolicySegment{
NewRetentionPolicySegment(start.UnixMilli(), end.UnixMilli(), []CustomRetentionRule{ruleA}, 45),
}, segments)
})
t.Run("row inside window splits segments", func(t *testing.T) {
firstChange := start.Add(6 * time.Hour)
secondChange := start.Add(18 * time.Hour)
segments, err := BuildRetentionPolicySegmentsFromRows(
[]*TTLSetting{
ttlSetting(t, firstChange, 21, []CustomRetentionRule{ruleA}),
ttlSetting(t, secondChange, 14, []CustomRetentionRule{ruleB}),
},
30,
start.UnixMilli(),
end.UnixMilli(),
)
require.NoError(t, err)
require.Equal(t, []*RetentionPolicySegment{
NewRetentionPolicySegment(start.UnixMilli(), firstChange.UnixMilli(), nil, 30),
NewRetentionPolicySegment(firstChange.UnixMilli(), secondChange.UnixMilli(), []CustomRetentionRule{ruleA}, 21),
NewRetentionPolicySegment(secondChange.UnixMilli(), end.UnixMilli(), []CustomRetentionRule{ruleB}, 14),
}, segments)
})
t.Run("no rows uses fallback", func(t *testing.T) {
segments, err := BuildRetentionPolicySegmentsFromRows(nil, 30, start.UnixMilli(), end.UnixMilli())
require.NoError(t, err)
require.Equal(t, []*RetentionPolicySegment{
NewRetentionPolicySegment(start.UnixMilli(), end.UnixMilli(), nil, 30),
}, segments)
})
}
func ttlSetting(t *testing.T, createdAt time.Time, ttlDays int, rules []CustomRetentionRule) *TTLSetting {
t.Helper()
condition, err := json.Marshal(rules)
require.NoError(t, err)
return &TTLSetting{
TimeAuditable: types.TimeAuditable{
CreatedAt: createdAt,
},
TTL: ttlDays,
Condition: string(condition),
}
}

View File

@@ -1,20 +0,0 @@
package zeustypes
import "go.opentelemetry.io/otel/attribute"
var (
// Identifies the organization.
OrganizationID = attribute.Key("signoz.organization.id")
// Identifies the retention bucket a meter belongs to.
RetentionDuration = attribute.Key("signoz.retention.duration")
)
func NewDimensions(kvs ...attribute.KeyValue) map[string]string {
dimensions := map[string]string{}
for _, kv := range kvs {
dimensions[string(kv.Key)] = kv.Value.AsString()
}
return dimensions
}

View File

@@ -1,29 +0,0 @@
package zeustypes
import (
"encoding/json"
)
type GettableDeployment struct {
ID string `json:"id"`
Name string `json:"name"`
Cluster struct {
ID string `json:"id"`
Name string `json:"name"`
Region struct {
ID string `json:"id"`
Name string `json:"name"`
DNS string `json:"dns"`
} `json:"region"`
} `json:"cluster"`
}
func NewGettableDeployment(data []byte) (*GettableDeployment, error) {
deployment := new(GettableDeployment)
err := json.Unmarshal(data, deployment)
if err != nil {
return nil, err
}
return deployment, nil
}

View File

@@ -1,46 +0,0 @@
package zeustypes
import (
"net/url"
"github.com/tidwall/gjson"
)
type Host struct {
Name string `json:"name" required:"true"`
IsDefault bool `json:"is_default" required:"true"`
URL string `json:"url" required:"true"`
}
type GettableHost struct {
Name string `json:"name" required:"true"`
State string `json:"state" required:"true"`
Tier string `json:"tier" required:"true"`
Hosts []Host `json:"hosts" required:"true"`
}
type PostableHost struct {
Name string `json:"name" required:"true"`
}
func NewGettableHost(data []byte) *GettableHost {
parsed := gjson.ParseBytes(data)
dns := parsed.Get("cluster.region.dns").String()
hostResults := parsed.Get("hosts").Array()
hosts := make([]Host, len(hostResults))
for i, h := range hostResults {
name := h.Get("name").String()
hosts[i].Name = name
hosts[i].IsDefault = h.Get("is_default").Bool()
hosts[i].URL = (&url.URL{Scheme: "https", Host: name + "." + dns}).String()
}
return &GettableHost{
Name: parsed.Get("name").String(),
State: parsed.Get("state").String(),
Tier: parsed.Get("tier").String(),
Hosts: hosts,
}
}

View File

@@ -1,54 +0,0 @@
package zeustypes
import "time"
type MeterCheckpoint struct {
Name string
StartDate time.Time
}
type Meter struct {
// MeterName is the fully-qualified meter identifier.
MeterName MeterName `json:"name"`
// Value is the aggregated integer scalar for this meter over the reporting window.
Value int64 `json:"value"`
// Unit is the metric unit for this meter.
Unit MeterUnit `json:"unit"`
// Aggregation names the aggregation applied to produce Value.
Aggregation MeterAggregation `json:"aggregation"`
// StartUnixMilli is the inclusive window start in epoch milliseconds.
StartUnixMilli int64 `json:"start_unix_milli"`
// EndUnixMilli is the exclusive window end in epoch milliseconds.
EndUnixMilli int64 `json:"end_unix_milli"`
// IsCompleted is false for the current day's partial value.
IsCompleted bool `json:"is_completed"`
// Dimensions is the per-meter label set.
Dimensions map[string]string `json:"dimensions"`
}
func NewMeter(
name MeterName,
value int64,
unit MeterUnit,
aggregation MeterAggregation,
window MeterWindow,
dimensions map[string]string,
) Meter {
return Meter{
MeterName: name,
Value: value,
Unit: unit,
Aggregation: aggregation,
StartUnixMilli: window.StartUnixMilli,
EndUnixMilli: window.EndUnixMilli,
IsCompleted: window.IsCompleted,
Dimensions: dimensions,
}
}

View File

@@ -1,12 +0,0 @@
package zeustypes
import "github.com/SigNoz/signoz/pkg/valuer"
type MeterAggregation struct {
valuer.String
}
var (
MeterAggregationSum = MeterAggregation{valuer.NewString("sum")}
MeterAggregationMax = MeterAggregation{valuer.NewString("max")}
)

View File

@@ -1,66 +0,0 @@
package zeustypes
import (
"encoding/json"
"regexp"
"github.com/SigNoz/signoz/pkg/errors"
)
var (
MeterSpanSize = MustNewMeterName("signoz.meter.span.size")
MeterLogSize = MustNewMeterName("signoz.meter.log.size")
MeterDatapointCount = MustNewMeterName("signoz.meter.metric.datapoint.count")
MeterPlatformActive = MustNewMeterName("signoz.meter.platform.active")
)
var meterNameRegex = regexp.MustCompile(`^[a-z][a-z0-9_.]+$`)
// MeterName is a validated dotted Zeus meter name.
type MeterName struct {
s string
}
func NewMeterName(s string) (MeterName, error) {
if !meterNameRegex.MatchString(s) {
return MeterName{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid meter name: %s", s)
}
return MeterName{s: s}, nil
}
func MustNewMeterName(s string) MeterName {
name, err := NewMeterName(s)
if err != nil {
panic(err)
}
return name
}
func (n MeterName) String() string {
return n.s
}
func (n MeterName) IsZero() bool {
return n.s == ""
}
func (n MeterName) MarshalJSON() ([]byte, error) {
return json.Marshal(n.s)
}
func (n *MeterName) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
name, err := NewMeterName(s)
if err != nil {
return err
}
*n = name
return nil
}

View File

@@ -1,30 +0,0 @@
package zeustypes
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestNewMeterWindow(t *testing.T) {
start := time.Date(2026, 5, 4, 0, 0, 0, 0, time.UTC)
window, err := NewMeterWindow(start.UnixMilli(), start.AddDate(0, 0, 1).UnixMilli(), true)
require.NoError(t, err)
require.Equal(t, start.UnixMilli(), window.StartUnixMilli)
require.Equal(t, start.AddDate(0, 0, 1).UnixMilli(), window.EndUnixMilli)
require.True(t, window.IsCompleted)
_, err = NewMeterWindow(0, start.UnixMilli(), true)
require.Error(t, err)
_, err = NewMeterWindow(start.UnixMilli(), start.UnixMilli(), false)
require.Error(t, err)
}
func TestMustNewMeterWindowPanicsForInvalidWindow(t *testing.T) {
require.Panics(t, func() {
MustNewMeterWindow(0, 0, true)
})
}

View File

@@ -1,12 +0,0 @@
package zeustypes
import "github.com/SigNoz/signoz/pkg/valuer"
type MeterUnit struct {
valuer.String
}
var (
MeterUnitCount = MeterUnit{valuer.NewString("count")}
MeterUnitBytes = MeterUnit{valuer.NewString("bytes")}
)

View File

@@ -1,43 +0,0 @@
package zeustypes
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
)
type MeterWindow struct {
StartUnixMilli int64
EndUnixMilli int64
IsCompleted bool
}
func NewMeterWindow(startUnixMilli, endUnixMilli int64, isCompleted bool) (MeterWindow, error) {
if startUnixMilli <= 0 {
return MeterWindow{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "meter window start must be positive: %d", startUnixMilli)
}
if endUnixMilli <= startUnixMilli {
return MeterWindow{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "meter window end must be after start: [%d, %d)", startUnixMilli, endUnixMilli)
}
return MeterWindow{
StartUnixMilli: startUnixMilli,
EndUnixMilli: endUnixMilli,
IsCompleted: isCompleted,
}, nil
}
func MustNewMeterWindow(startUnixMilli, endUnixMilli int64, isCompleted bool) MeterWindow {
window, err := NewMeterWindow(startUnixMilli, endUnixMilli, isCompleted)
if err != nil {
panic(err)
}
return window
}
func (w MeterWindow) Day() time.Time {
t := time.UnixMilli(w.StartUnixMilli).UTC()
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
}

View File

@@ -1,13 +0,0 @@
package zeustypes
type PostableProfile struct {
UsesOtel bool `json:"uses_otel" required:"true"`
HasExistingObservabilityTool bool `json:"has_existing_observability_tool" required:"true"`
ExistingObservabilityTool string `json:"existing_observability_tool" required:"true"`
ReasonsForInterestInSigNoz []string `json:"reasons_for_interest_in_signoz" required:"true"`
LogsScalePerDayInGB int64 `json:"logs_scale_per_day_in_gb" required:"true"`
NumberOfServices int64 `json:"number_of_services" required:"true"`
NumberOfHosts int64 `json:"number_of_hosts" required:"true"`
WhereDidYouDiscoverSigNoz string `json:"where_did_you_discover_signoz" required:"true"`
TimelineForMigratingToSigNoz string `json:"timeline_for_migrating_to_signoz" required:"true"`
}

View File

@@ -0,0 +1,87 @@
package zeustypes
import (
"encoding/json"
"net/url"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/tidwall/gjson"
)
type PostableHost struct {
Name string `json:"name" required:"true"`
}
type PostableProfile struct {
UsesOtel bool `json:"uses_otel" required:"true"`
HasExistingObservabilityTool bool `json:"has_existing_observability_tool" required:"true"`
ExistingObservabilityTool string `json:"existing_observability_tool" required:"true"`
ReasonsForInterestInSigNoz []string `json:"reasons_for_interest_in_signoz" required:"true"`
LogsScalePerDayInGB int64 `json:"logs_scale_per_day_in_gb" required:"true"`
NumberOfServices int64 `json:"number_of_services" required:"true"`
NumberOfHosts int64 `json:"number_of_hosts" required:"true"`
WhereDidYouDiscoverSigNoz string `json:"where_did_you_discover_signoz" required:"true"`
TimelineForMigratingToSigNoz string `json:"timeline_for_migrating_to_signoz" required:"true"`
}
type GettableHost struct {
Name string `json:"name" required:"true"`
State string `json:"state" required:"true"`
Tier string `json:"tier" required:"true"`
Hosts []Host `json:"hosts" required:"true"`
}
type Host struct {
Name string `json:"name" required:"true"`
IsDefault bool `json:"is_default" required:"true"`
URL string `json:"url" required:"true"`
}
func NewGettableHost(data []byte) *GettableHost {
parsed := gjson.ParseBytes(data)
dns := parsed.Get("cluster.region.dns").String()
hostResults := parsed.Get("hosts").Array()
hosts := make([]Host, len(hostResults))
for i, h := range hostResults {
name := h.Get("name").String()
hosts[i].Name = name
hosts[i].IsDefault = h.Get("is_default").Bool()
hosts[i].URL = (&url.URL{Scheme: "https", Host: name + "." + dns}).String()
}
return &GettableHost{
Name: parsed.Get("name").String(),
State: parsed.Get("state").String(),
Tier: parsed.Get("tier").String(),
Hosts: hosts,
}
}
// GettableDeployment represents the parsed deployment info from zeus.GetDeployment.
// NOTE: this is not a full response structure, add more fields from actual response as per requirement.
type GettableDeployment struct {
ID string `json:"id"`
Name string `json:"name"`
Cluster struct {
ID string `json:"id"`
Name string `json:"name"`
Region struct {
ID string `json:"id"`
Name string `json:"name"`
DNS string `json:"dns"`
} `json:"region"`
} `json:"cluster"`
}
// NewGettableDeployment parses raw GetDeployment bytes into a GettableDeployment.
func NewGettableDeployment(data []byte) (*GettableDeployment, error) {
deployment := new(GettableDeployment)
err := json.Unmarshal(data, deployment)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal deployment response")
}
return deployment, nil
}

View File

@@ -49,14 +49,6 @@ func (provider *provider) PutMetersV2(_ context.Context, _ string, _ []byte) err
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v2 is not supported")
}
func (provider *provider) PutMetersV3(_ context.Context, _ string, _ string, _ []byte) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v3 is not supported")
}
func (provider *provider) ListMeterCheckpoints(_ context.Context, _ string) ([]zeustypes.MeterCheckpoint, error) {
return nil, errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "list meter checkpoints is not supported")
}
func (provider *provider) PutProfile(_ context.Context, _ string, _ *zeustypes.PostableProfile) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
}

View File

@@ -35,16 +35,6 @@ type Zeus interface {
// Puts the meters for the given license key using Zeus.
PutMetersV2(context.Context, string, []byte) error
// PutMetersV3 ships one day's raw JSON array of meter readings to the
// v2/meters endpoint. idempotencyKey is propagated as X-Idempotency-Key so
// Zeus can UPSERT on retries.
PutMetersV3(ctx context.Context, licenseKey string, idempotencyKey string, body []byte) error
// ListMeterCheckpoints returns the latest sealed (is_completed=true) UTC day
// Zeus has stored for each billing meter name. Missing meter names are
// treated by the cron as bootstrap cases.
ListMeterCheckpoints(ctx context.Context, licenseKey string) ([]zeustypes.MeterCheckpoint, error)
// Put profile for the given license key.
PutProfile(context.Context, string, *zeustypes.PostableProfile) error

View File

@@ -6,7 +6,7 @@ import uuid
from abc import ABC
from collections.abc import Callable, Generator
from enum import Enum
from typing import Any
from typing import Any, Literal
from urllib.parse import urlparse
import numpy as np
@@ -236,6 +236,7 @@ class Traces(ABC):
attributes_number: dict[str, np.float64]
attributes_bool: dict[str, bool]
resources_string: dict[str, str]
resource_json: dict[str, str]
events: list[str]
links: str
response_status_code: str
@@ -273,6 +274,7 @@ class Traces(ABC):
links: list[TracesLink] = [],
trace_state: str = "",
flags: np.uint32 = 0,
resource_write_mode: Literal["legacy_only", "dual_write"] = "dual_write",
) -> None:
if timestamp is None:
timestamp = datetime.datetime.now()
@@ -322,8 +324,11 @@ class Traces(ABC):
self.db_name = ""
self.db_operation = ""
# Process resources and derive service_name
# Process resources and derive service_name. Spans written before the
# JSON-resource evolution time only populate resources_string (legacy_only);
# spans at or after the evolution time dual-write to both columns.
self.resources_string = {k: str(v) for k, v in resources.items()}
self.resource_json = {} if resource_write_mode == "legacy_only" else dict(self.resources_string)
self.service_name = self.resources_string.get("service.name", "default-service")
for k, v in self.resources_string.items():
@@ -575,7 +580,7 @@ class Traces(ABC):
self.db_operation,
self.has_error,
self.is_remote,
self.resources_string,
self.resource_json,
],
dtype=object,
)

View File

@@ -0,0 +1,240 @@
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.querier import (
build_group_by_field,
build_logs_aggregation,
index_series_by_label,
make_query_request,
)
from fixtures.traces import TraceIdGenerator, Traces
# we already create the evolution for resource during schema migration
# since we have to create test data around it, we need to get the evolution time
def _get_traces_resource_evolution_time_json(signoz: types.SigNoz) -> datetime:
result = signoz.telemetrystore.conn.query(
"""
SELECT release_time
FROM signoz_metadata.distributed_column_evolution_metadata
WHERE signal = 'traces'
AND field_context = 'resource'
AND field_name = '__all__'
AND column_name = 'resource'
LIMIT 1
"""
).result_rows
assert result, "Expected traces resource evolution metadata to exist"
release_time_ns = int(result[0][0])
return datetime.fromtimestamp(release_time_ns / 1e9, tz=UTC)
# Spans with timestamps before the evolution time will have resources written only to resources_string.
# Spans with timestamps at or after the evolution time will have resources written to both resources_string and resource (JSON).
def _build_evolved_span(
timestamp: datetime,
evolution_time: datetime,
service_name: str,
name: str,
) -> Traces:
resource_write_mode = "legacy_only" if timestamp < evolution_time else "dual_write"
return Traces(
timestamp=timestamp,
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
name=name,
resources={
"service.name": service_name,
"deployment.environment": "integration",
},
resource_write_mode=resource_write_mode,
)
def _query_grouped_trace_series(
signoz: types.SigNoz,
token: str,
start: datetime,
end: datetime,
group_by: str = "service.name",
aggregation: str = "count()",
) -> dict[str, list[dict]]:
response = make_query_request(
signoz,
token,
start_ms=int(start.timestamp() * 1000),
end_ms=int(end.timestamp() * 1000),
request_type="time_series",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"groupBy": [build_group_by_field(group_by)],
"having": {"expression": ""},
"aggregations": [build_logs_aggregation(aggregation)],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert len(aggregations) == 1
return index_series_by_label(aggregations[0]["series"], group_by)
def _assert_grouped_series(
series_by_group: dict[str, dict],
expected_values_by_group: dict[str, dict[int, int]],
) -> None:
assert set(series_by_group.keys()) == set(expected_values_by_group.keys())
for group_name, expected_by_ts in expected_values_by_group.items():
actual_values = sorted(
series_by_group[group_name]["values"],
key=lambda value: value["timestamp"],
)
expected_values = [{"timestamp": timestamp, "value": value} for timestamp, value in sorted(expected_by_ts.items())]
assert actual_values == expected_values
def _test_traces_resource_evolution(
signoz: types.SigNoz,
token: str,
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
# 1. Get the evolution time.
# 2. Ingest spans before the evolution time.
# 3. Ingest spans after the evolution time.
# 4. Query the spans before the evolution time.
# 5. Query the spans after the evolution time.
# Both aggregation and group by should be checked.
"""
evolution_time = _get_traces_resource_evolution_time_json(signoz)
evolution_time = evolution_time.replace(second=0, microsecond=0)
before_2 = evolution_time - timedelta(minutes=10)
before_1 = evolution_time - timedelta(minutes=5)
after_1 = evolution_time + timedelta(minutes=5)
after_2 = evolution_time + timedelta(minutes=10)
insert_traces(
[
_build_evolved_span(
timestamp=before_2,
evolution_time=evolution_time,
service_name="svc-before-2",
name="span before evolution 2",
),
_build_evolved_span(
timestamp=before_1,
evolution_time=evolution_time,
service_name="svc-before-1",
name="span before evolution 1",
),
_build_evolved_span(
timestamp=after_1,
evolution_time=evolution_time,
service_name="svc-after-1",
name="span after evolution 1",
),
_build_evolved_span(
timestamp=after_2,
evolution_time=evolution_time,
service_name="svc-after-2",
name="span after evolution 2",
),
]
)
before_series = _query_grouped_trace_series(signoz, token, before_2 - timedelta(minutes=1), before_1 + timedelta(minutes=1))
_assert_grouped_series(
before_series,
expected_values_by_group={
"svc-before-2": {
int(before_2.timestamp() * 1000): 1,
},
"svc-before-1": {
int(before_1.timestamp() * 1000): 1,
},
},
)
after_series = _query_grouped_trace_series(signoz, token, after_1 - timedelta(minutes=1), after_2 + timedelta(minutes=1))
_assert_grouped_series(
after_series,
expected_values_by_group={
"svc-after-1": {
int(after_1.timestamp() * 1000): 1,
},
"svc-after-2": {
int(after_2.timestamp() * 1000): 1,
},
},
)
spanning_series = _query_grouped_trace_series(signoz, token, before_2, after_2 + timedelta(minutes=1))
_assert_grouped_series(
spanning_series,
expected_values_by_group={
"svc-before-2": {
int(before_2.timestamp() * 1000): 1,
},
"svc-before-1": {
int(before_1.timestamp() * 1000): 1,
},
"svc-after-1": {
int(after_1.timestamp() * 1000): 1,
},
"svc-after-2": {
int(after_2.timestamp() * 1000): 1,
},
},
)
# query to check aggregation on the resource field like count_distinct(service.name)
aggregation_series = _query_grouped_trace_series(
signoz,
token,
before_2,
after_2 + timedelta(minutes=1),
group_by="deployment.environment",
aggregation="count_distinct(service.name)",
)
_assert_grouped_series(
aggregation_series,
expected_values_by_group={
"integration": {
int(before_2.timestamp() * 1000): 1,
int(before_1.timestamp() * 1000): 1,
int(after_1.timestamp() * 1000): 1,
int(after_2.timestamp() * 1000): 1,
},
},
)
def test_traces_resource_evolution(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
) -> None:
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
_test_traces_resource_evolution(signoz, token, insert_traces)