Compare commits

...

42 Commits

Author SHA1 Message Date
Karan Balani
3616022049 refactor(meters): add meter constructor 2026-05-04 20:47:09 +05:30
Karan Balani
5d270b716b refactor(meters): rename platform fee collector 2026-05-04 20:08:06 +05:30
Karan Balani
19d9d26051 Merge branch 'main' into feat/billing-meterreporter 2026-05-04 19:37:04 +05:30
Karan Balani
522148362b refactor(retention): move ttl types 2026-05-04 18:21:28 +05:30
Karan Balani
781158ecab refactor(meters): align retention and zeus 2026-05-04 18:12:11 +05:30
Karan Balani
0603bd6b27 fix: ci lint and flag default value 2026-05-04 17:38:29 +05:30
Karan Balani
37c57e6c05 Merge branch 'feat/billing-meterreporter' of github.com:SigNoz/signoz into feat/billing-meterreporter 2026-05-04 17:31:33 +05:30
Karan Balani
12a2e63a31 Merge branch 'main' into feat/billing-meterreporter 2026-05-04 17:07:27 +05:30
Karan Balani
453bcc06c4 chore(meterreporter): increase catchup window 2026-05-04 17:06:20 +05:30
Karan Balani
fac5fe6b9e test(metercollector): add collector coverage 2026-05-04 17:06:20 +05:30
Karan Balani
0ad412b844 Merge branch 'main' into feat/billing-meterreporter 2026-05-04 15:40:50 +05:30
Karan Balani
d1957b5eac chore(meterreporter): trim comments 2026-05-04 15:38:54 +05:30
Karan Balani
dba9cfd455 refactor(meterreporter): wire http collectors 2026-05-04 15:38:54 +05:30
Karan Balani
ed2011a7bb feat(metercollector/retention): add narrow retention slice loader and SQL helpers 2026-05-04 15:38:54 +05:30
Karan Balani
68385478c7 feat(metercollector): add MeterCollector interface and split type packages 2026-05-04 15:38:54 +05:30
Karan Balani
eb661b7ac7 Merge branch 'main' into feat/billing-meterreporter 2026-04-30 14:59:59 +05:30
Karan Balani
afd6868423 Merge branch 'feat/billing-meterreporter' of github.com:SigNoz/signoz into feat/billing-meterreporter 2026-04-30 14:57:57 +05:30
Karan Balani
8ddf0a13c1 feat: make retention buckets generic 2026-04-30 14:20:44 +05:30
Karan Balani
16f0d2aa38 Merge branch 'main' into feat/billing-meterreporter 2026-04-29 13:44:24 +05:30
Karan Balani
3af912c586 chore: add tracing and logging 2026-04-29 13:28:53 +05:30
Karan Balani
ad7715802b refactor: push meters in batch for each day 2026-04-29 12:43:42 +05:30
Karan Balani
b579bdbd7b refactor: simplify some sections of tick 2026-04-29 11:32:57 +05:30
Karan Balani
aa64cf7bbf refactor: move few things to ee package 2026-04-29 10:40:48 +05:30
Karan Balani
2d33b1a743 refactor: remove HistoricalBackfillDays 2026-04-29 03:54:18 +05:30
Karan Balani
4fbf7de8e1 refactor: cleanup comments 2026-04-29 03:31:58 +05:30
Karan Balani
7528b19fd4 Merge branch 'main' into feat/billing-meterreporter 2026-04-29 01:56:01 +05:30
Karan Balani
42e4196aad feat(meterreporter): add metric and trace meters 2026-04-29 00:35:52 +05:30
Karan Balani
22cdb03702 chore: intermediate commit 2026-04-28 21:30:10 +05:30
Karan Balani
6eca3dc06e refactor: add retentiontypes 2026-04-28 21:21:08 +05:30
Karan Balani
0631189417 refactor(meterreporter): remove unused retry config 2026-04-28 20:32:19 +05:30
Karan Balani
ec552b94cc fix(meterreporter): pin retention type 2026-04-28 18:49:35 +05:30
Karan Balani
ee8d99f1d0 chore: lower HistoricalBackfillDays 2026-04-28 17:51:16 +05:30
Karan Balani
bf77e26a86 feat(meterreporter): bootstrap from data floor, emit sentinel zero-readings 2026-04-28 17:26:31 +05:30
Karan Balani
9cd3cf23d7 chore: skip meter checkpoint call temporarily 2026-04-28 16:25:45 +05:30
Karan Balani
4a44802ebc feat: improve retention period queries based on workspace ids for logs only for now 2026-04-28 13:30:44 +05:30
Karan Balani
f2aed0d834 chore: intermediate commit 2026-04-28 13:30:44 +05:30
Karan Balani
527d8c0459 feat(meterreporter): sealed-range catch-up and today-partial ticks 2026-04-28 13:30:44 +05:30
Karan Balani
8fdc91260e feat: add telemetry for collect and ship durations & improve comments 2026-04-28 13:30:44 +05:30
Karan Balani
218c4524b1 chore: update interval validation to allow min 5 mins interval for testing 2026-04-28 13:30:44 +05:30
Karan Balani
02dec846eb feat(meterreporter): add traces meters 2026-04-28 13:30:44 +05:30
Karan Balani
99dadb7247 feat(meterreporter): simplify code, add metric meters, dry-run zeus call 2026-04-28 13:30:44 +05:30
Karan Balani
44b41c40de feat: meter reporter for new billing infra 2026-04-28 13:30:41 +05:30
50 changed files with 4176 additions and 228 deletions

View File

@@ -18,11 +18,13 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/gateway"
"github.com/SigNoz/signoz/pkg/gateway/noopgateway"
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -109,6 +111,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
return signoz.NewAuditorProviderFactories()
},
func(_ context.Context, _ flagger.Flagger, _ licensing.Licensing, _ telemetrystore.TelemetryStore, _ sqlstore.SQLStore, _ organization.Getter, _ zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string) {
return signoz.NewMeterReporterProviderFactories(), "noop"
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
return querier.NewHandler(ps, q, a)
},

View File

@@ -17,6 +17,14 @@ import (
"github.com/SigNoz/signoz/ee/gateway/httpgateway"
enterpriselicensing "github.com/SigNoz/signoz/ee/licensing"
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
"github.com/SigNoz/signoz/ee/metercollector/baseplatformfeemetercollector"
"github.com/SigNoz/signoz/ee/metercollector/datapointcountmetercollector"
"github.com/SigNoz/signoz/ee/metercollector/datapointsizemetercollector"
"github.com/SigNoz/signoz/ee/metercollector/logcountmetercollector"
"github.com/SigNoz/signoz/ee/metercollector/logsizemetercollector"
"github.com/SigNoz/signoz/ee/metercollector/spancountmetercollector"
"github.com/SigNoz/signoz/ee/metercollector/spansizemetercollector"
"github.com/SigNoz/signoz/ee/meterreporter/httpmeterreporter"
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration/implcloudprovider"
"github.com/SigNoz/signoz/ee/modules/dashboard/impldashboard"
@@ -35,9 +43,12 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
pkgflagger "github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/gateway"
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
pkgcloudintegration "github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -57,7 +68,10 @@ import (
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/cloudintegrationtypes"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/version"
"github.com/SigNoz/signoz/pkg/zeus"
)
@@ -157,6 +171,19 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
}
return factories
},
func(ctx context.Context, flagger pkgflagger.Flagger, licensing licensing.Licensing, telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore, orgGetter organization.Getter, zeus zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string) {
factories := signoz.NewMeterReporterProviderFactories()
if err := factories.Add(httpmeterreporter.NewFactory(newMeterCollectors(licensing, telemetryStore, sqlStore), licensing, telemetryStore, orgGetter, zeus)); err != nil {
panic(err)
}
evalCtx := featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})
if flagger.BooleanOrEmpty(ctx, pkgflagger.FeatureUseMeterReporter, evalCtx) {
return factories, "http"
}
return factories, "noop"
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
communityHandler := querier.NewHandler(ps, q, a)
return eequerier.NewHandler(ps, q, communityHandler)
@@ -216,3 +243,15 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
return nil
}
func newMeterCollectors(licensing licensing.Licensing, telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) map[metercollectortypes.Name]metercollector.MeterCollector {
return map[metercollectortypes.Name]metercollector.MeterCollector{
baseplatformfeemetercollector.MeterName: baseplatformfeemetercollector.New(licensing),
logcountmetercollector.MeterName: logcountmetercollector.New(telemetryStore, sqlStore),
logsizemetercollector.MeterName: logsizemetercollector.New(telemetryStore, sqlStore),
datapointcountmetercollector.MeterName: datapointcountmetercollector.New(telemetryStore, sqlStore),
datapointsizemetercollector.MeterName: datapointsizemetercollector.New(telemetryStore, sqlStore),
spancountmetercollector.MeterName: spancountmetercollector.New(telemetryStore, sqlStore),
spansizemetercollector.MeterName: spansizemetercollector.New(telemetryStore, sqlStore),
}
}

View File

@@ -429,3 +429,10 @@ authz:
openfga:
# maximum tuples allowed per openfga write operation.
max_tuples_per_write: 100
##################### Meter Reporter #####################
meterreporter:
# The interval between collection ticks. Minimum 5m.
interval: 6h
# The per-tick timeout that bounds collect-and-ship work. Minimum 3m and must be less than interval.
timeout: 5m

View File

@@ -0,0 +1,58 @@
// Package baseplatformfeemetercollector collects the license-derived base platform fee meter.
package baseplatformfeemetercollector
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MeterName is the typed registry key for this collector.
var (
MeterName = metercollectortypes.MustNewName("signoz.meter.base.platform.fee")
meterUnit = metercollectortypes.UnitCount
meterAggregation = metercollectortypes.AggregationMax
)
var _ metercollector.MeterCollector = (*Provider)(nil)
// Provider collects base platform fee meters.
type Provider struct {
licensing licensing.Licensing
}
func New(licensing licensing.Licensing) *Provider {
return &Provider{licensing: licensing}
}
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
return meterAggregation
}
// Collect emits value 1 when the org has an active license.
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
if !window.IsValid() {
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
}
license, err := p.licensing.GetActive(ctx, orgID)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "fetch active license for base platform fee meter")
}
if license == nil || license.Key == "" {
return nil, nil
}
return []meterreportertypes.Meter{
meterreportertypes.NewMeter(MeterName, 1, meterUnit, meterAggregation, window, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
}),
}, nil
}

View File

@@ -0,0 +1,107 @@
package baseplatformfeemetercollector
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestCollectEmitsBasePlatformFeeMeterForValidLicense(t *testing.T) {
orgID := valuer.GenerateUUID()
window := completedWindow()
provider := New(&fakeLicensing{
license: &licensetypes.License{Key: "license-key"},
})
readings, err := provider.Collect(context.Background(), orgID, window)
require.NoError(t, err)
require.Equal(t, []meterreportertypes.Meter{
meterreportertypes.NewMeter(MeterName, 1, metercollectortypes.UnitCount, metercollectortypes.AggregationMax, window, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
}),
}, readings)
}
func TestCollectSkipsNilLicense(t *testing.T) {
readings, err := New(&fakeLicensing{}).Collect(context.Background(), valuer.GenerateUUID(), completedWindow())
require.NoError(t, err)
require.Empty(t, readings)
}
func TestProviderMetadata(t *testing.T) {
provider := New(&fakeLicensing{})
require.Equal(t, "signoz.meter.base.platform.fee", provider.Name().String())
require.Equal(t, metercollectortypes.UnitCount, provider.Unit())
require.Equal(t, metercollectortypes.AggregationMax, provider.Aggregation())
}
func TestCollectRejectsInvalidWindowBeforeLicensing(t *testing.T) {
readings, err := New(nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
require.Error(t, err)
require.Nil(t, readings)
}
func completedWindow() meterreportertypes.Window {
start := time.Date(2026, 5, 4, 0, 0, 0, 0, time.UTC)
return meterreportertypes.Window{
StartUnixMilli: start.UnixMilli(),
EndUnixMilli: start.AddDate(0, 0, 1).UnixMilli(),
IsCompleted: true,
}
}
var _ licensing.Licensing = (*fakeLicensing)(nil)
type fakeLicensing struct {
license *licensetypes.License
err error
}
func (f *fakeLicensing) Start(context.Context) error {
return nil
}
func (f *fakeLicensing) Stop(context.Context) error {
return nil
}
func (f *fakeLicensing) Validate(context.Context) error {
return nil
}
func (f *fakeLicensing) Activate(context.Context, valuer.UUID, string) error {
return nil
}
func (f *fakeLicensing) GetActive(context.Context, valuer.UUID) (*licensetypes.License, error) {
return f.license, f.err
}
func (f *fakeLicensing) Refresh(context.Context, valuer.UUID) error {
return nil
}
func (f *fakeLicensing) Checkout(context.Context, valuer.UUID, *licensetypes.PostableSubscription) (*licensetypes.GettableSubscription, error) {
return &licensetypes.GettableSubscription{}, nil
}
func (f *fakeLicensing) Portal(context.Context, valuer.UUID, *licensetypes.PostableSubscription) (*licensetypes.GettableSubscription, error) {
return &licensetypes.GettableSubscription{}, nil
}
func (f *fakeLicensing) GetFeatureFlags(context.Context, valuer.UUID) ([]*licensetypes.Feature, error) {
return nil, nil
}
func (f *fakeLicensing) Collect(context.Context, valuer.UUID) (map[string]any, error) {
return map[string]any{}, nil
}

View File

@@ -0,0 +1,276 @@
// Package datapointcountmetercollector collects metric datapoint count meters
// by workspace and retention. Keep the query local to this meter.
package datapointcountmetercollector
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"github.com/huandu/go-sqlbuilder"
"github.com/SigNoz/signoz/ee/metercollector/retention"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MeterName is the typed registry key for this collector.
var (
MeterName = metercollectortypes.MustNewName("signoz.meter.metric.datapoint.count")
meterUnit = metercollectortypes.UnitCount
meterAggregation = metercollectortypes.AggregationSum
)
var _ metercollector.MeterCollector = (*Provider)(nil)
// Provider collects datapoint count meters.
type Provider struct {
telemetryStore telemetrystore.TelemetryStore
sqlStore sqlstore.SQLStore
}
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
return &Provider{
telemetryStore: telemetryStore,
sqlStore: sqlStore,
}
}
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
return meterAggregation
}
// Collect aggregates datapoint count for the window and emits an empty-day sentinel.
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
if !window.IsValid() {
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
}
meterName := MeterName.String()
slices, err := retention.LoadActiveSlices(
ctx,
p.sqlStore,
orgID,
telemetrymetrics.DBName+"."+telemetrymetrics.SamplesV4LocalTableName,
retentiontypes.DefaultMetricsRetentionDays,
window.StartUnixMilli, window.EndUnixMilli,
)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
}
type bucket struct {
dimensions map[string]string
value float64
}
accumulator := make(map[string]*bucket)
for _, slice := range slices {
query, args, dimensionColumns, err := buildQuery(meterName, slice)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
}
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
if err := func() error {
defer rows.Close()
for rows.Next() {
dimensionValues := make([]string, len(dimensionColumns))
var retentionDays int32
var retentionRuleIndex int32
var value float64
scanDest := make([]any, 0, len(dimensionValues)+3)
for i := range dimensionValues {
scanDest = append(scanDest, &dimensionValues[i])
}
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
if err := rows.Scan(scanDest...); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
key := bucketKey(dimensions)
b, ok := accumulator[key]
if !ok {
b = &bucket{dimensions: dimensions}
accumulator[key] = b
}
b.value += value
}
if err := rows.Err(); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
return nil
}(); err != nil {
return nil, err
}
}
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
for _, b := range accumulator {
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
}
// Empty windows still emit a sentinel so checkpoints can advance.
if len(meters) == 0 && len(slices) > 0 {
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
}))
}
return meters, nil
}
// buildQuery stays local because each meter owns its billing query.
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
if err != nil {
return "", nil, nil, err
}
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
if err != nil {
return "", nil, nil, err
}
columns, err := dimensionColumnsFor(slice.Rules)
if err != nil {
return "", nil, nil, err
}
selects := make([]string, 0, len(columns)+3)
groupBy := make([]string, 0, len(columns)+2)
for _, column := range columns {
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
groupBy = append(groupBy, column.alias)
}
selects = append(selects,
retentionExpr+" AS retention_days",
retentionRuleIndexExpr+" AS retention_rule_index",
"ifNull(sum(value), 0) AS value",
)
groupBy = append(groupBy, "retention_days", "retention_rule_index")
sb := sqlbuilder.NewSelectBuilder()
sb.Select(selects...)
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", meterName),
sb.GTE("unix_milli", slice.StartMs),
sb.LT("unix_milli", slice.EndMs),
)
sb.GroupBy(groupBy...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return query, args, columns, nil
}
type dimensionColumn struct {
key string
alias string
}
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
dimensionKeys, err := retention.RuleDimensionKeys(rules)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(dimensionKeys)+1)
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
for _, key := range dimensionKeys {
if key == metercollector.DimensionWorkspaceKeyID {
continue
}
keys = append(keys, key)
}
columns := make([]dimensionColumn, len(keys))
for i, key := range keys {
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
}
return columns, nil
}
func buildDimensions(
orgID valuer.UUID,
retentionDays int,
retentionRuleIndex int,
columns []dimensionColumn,
values []string,
rules []retentiontypes.CustomRetentionRule,
) (map[string]string, error) {
if len(columns) != len(values) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
}
valuesByKey := make(map[string]string, len(columns))
for i, column := range columns {
valuesByKey[column.key] = values[i]
}
dimensions := map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
}
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
if retentionRuleIndex < 0 {
return dimensions, nil
}
if retentionRuleIndex >= len(rules) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
}
for _, filter := range rules[retentionRuleIndex].Filters {
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
}
return dimensions, nil
}
func addNonEmpty(dimensions map[string]string, key, value string) {
if value == "" {
return
}
dimensions[key] = value
}
func bucketKey(dimensions map[string]string) string {
keys := make([]string, 0, len(dimensions))
for key := range dimensions {
keys = append(keys, key)
}
sort.Strings(keys)
var b strings.Builder
for _, key := range keys {
value := dimensions[key]
b.WriteString(strconv.Itoa(len(key)))
b.WriteByte(':')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(strconv.Itoa(len(value)))
b.WriteByte(':')
b.WriteString(value)
b.WriteByte(';')
}
return b.String()
}

View File

@@ -0,0 +1,67 @@
package datapointcountmetercollector
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestBuildDimensions(t *testing.T) {
orgID := valuer.GenerateUUID()
rules := []retentiontypes.CustomRetentionRule{{
Filters: []retentiontypes.FilterCondition{{
Key: "service.name",
Values: []string{"api"},
}},
TTLDays: 7,
}}
columns := []dimensionColumn{
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
{key: "service.name", alias: "dim_1"},
}
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
require.NoError(t, err)
require.Equal(t, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
}, dimensions)
}
func TestProviderMetadata(t *testing.T) {
provider := New(nil, nil)
require.Equal(t, "signoz.meter.metric.datapoint.count", provider.Name().String())
require.Equal(t, metercollectortypes.UnitCount, provider.Unit())
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
}
func TestBucketKeyIsStable(t *testing.T) {
first := bucketKey(map[string]string{
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
})
second := bucketKey(map[string]string{
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
})
require.Equal(t, first, second)
require.NotEmpty(t, first)
}
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
require.Error(t, err)
require.Nil(t, readings)
}

View File

@@ -0,0 +1,276 @@
// Package datapointsizemetercollector collects metric datapoint size meters
// by workspace and retention. Keep the query local to this meter.
package datapointsizemetercollector
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"github.com/huandu/go-sqlbuilder"
"github.com/SigNoz/signoz/ee/metercollector/retention"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MeterName is the typed registry key for this collector.
var (
MeterName = metercollectortypes.MustNewName("signoz.meter.metric.datapoint.size")
meterUnit = metercollectortypes.UnitBytes
meterAggregation = metercollectortypes.AggregationSum
)
var _ metercollector.MeterCollector = (*Provider)(nil)
// Provider collects datapoint size meters.
type Provider struct {
telemetryStore telemetrystore.TelemetryStore
sqlStore sqlstore.SQLStore
}
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
return &Provider{
telemetryStore: telemetryStore,
sqlStore: sqlStore,
}
}
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
return meterAggregation
}
// Collect aggregates datapoint size for the window and emits an empty-day sentinel.
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
if !window.IsValid() {
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
}
meterName := MeterName.String()
slices, err := retention.LoadActiveSlices(
ctx,
p.sqlStore,
orgID,
telemetrymetrics.DBName+"."+telemetrymetrics.SamplesV4LocalTableName,
retentiontypes.DefaultMetricsRetentionDays,
window.StartUnixMilli, window.EndUnixMilli,
)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
}
type bucket struct {
dimensions map[string]string
value float64
}
accumulator := make(map[string]*bucket)
for _, slice := range slices {
query, args, dimensionColumns, err := buildQuery(meterName, slice)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
}
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
if err := func() error {
defer rows.Close()
for rows.Next() {
dimensionValues := make([]string, len(dimensionColumns))
var retentionDays int32
var retentionRuleIndex int32
var value float64
scanDest := make([]any, 0, len(dimensionValues)+3)
for i := range dimensionValues {
scanDest = append(scanDest, &dimensionValues[i])
}
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
if err := rows.Scan(scanDest...); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
key := bucketKey(dimensions)
b, ok := accumulator[key]
if !ok {
b = &bucket{dimensions: dimensions}
accumulator[key] = b
}
b.value += value
}
if err := rows.Err(); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
return nil
}(); err != nil {
return nil, err
}
}
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
for _, b := range accumulator {
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
}
// Empty windows still emit a sentinel so checkpoints can advance.
if len(meters) == 0 && len(slices) > 0 {
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
}))
}
return meters, nil
}
// buildQuery stays local because each meter owns its billing query.
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
if err != nil {
return "", nil, nil, err
}
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
if err != nil {
return "", nil, nil, err
}
columns, err := dimensionColumnsFor(slice.Rules)
if err != nil {
return "", nil, nil, err
}
selects := make([]string, 0, len(columns)+3)
groupBy := make([]string, 0, len(columns)+2)
for _, column := range columns {
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
groupBy = append(groupBy, column.alias)
}
selects = append(selects,
retentionExpr+" AS retention_days",
retentionRuleIndexExpr+" AS retention_rule_index",
"ifNull(sum(value), 0) AS value",
)
groupBy = append(groupBy, "retention_days", "retention_rule_index")
sb := sqlbuilder.NewSelectBuilder()
sb.Select(selects...)
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", meterName),
sb.GTE("unix_milli", slice.StartMs),
sb.LT("unix_milli", slice.EndMs),
)
sb.GroupBy(groupBy...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return query, args, columns, nil
}
type dimensionColumn struct {
key string
alias string
}
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
dimensionKeys, err := retention.RuleDimensionKeys(rules)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(dimensionKeys)+1)
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
for _, key := range dimensionKeys {
if key == metercollector.DimensionWorkspaceKeyID {
continue
}
keys = append(keys, key)
}
columns := make([]dimensionColumn, len(keys))
for i, key := range keys {
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
}
return columns, nil
}
func buildDimensions(
orgID valuer.UUID,
retentionDays int,
retentionRuleIndex int,
columns []dimensionColumn,
values []string,
rules []retentiontypes.CustomRetentionRule,
) (map[string]string, error) {
if len(columns) != len(values) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
}
valuesByKey := make(map[string]string, len(columns))
for i, column := range columns {
valuesByKey[column.key] = values[i]
}
dimensions := map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
}
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
if retentionRuleIndex < 0 {
return dimensions, nil
}
if retentionRuleIndex >= len(rules) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
}
for _, filter := range rules[retentionRuleIndex].Filters {
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
}
return dimensions, nil
}
func addNonEmpty(dimensions map[string]string, key, value string) {
if value == "" {
return
}
dimensions[key] = value
}
func bucketKey(dimensions map[string]string) string {
keys := make([]string, 0, len(dimensions))
for key := range dimensions {
keys = append(keys, key)
}
sort.Strings(keys)
var b strings.Builder
for _, key := range keys {
value := dimensions[key]
b.WriteString(strconv.Itoa(len(key)))
b.WriteByte(':')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(strconv.Itoa(len(value)))
b.WriteByte(':')
b.WriteString(value)
b.WriteByte(';')
}
return b.String()
}

View File

@@ -0,0 +1,67 @@
package datapointsizemetercollector
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestBuildDimensions(t *testing.T) {
orgID := valuer.GenerateUUID()
rules := []retentiontypes.CustomRetentionRule{{
Filters: []retentiontypes.FilterCondition{{
Key: "service.name",
Values: []string{"api"},
}},
TTLDays: 7,
}}
columns := []dimensionColumn{
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
{key: "service.name", alias: "dim_1"},
}
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
require.NoError(t, err)
require.Equal(t, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
}, dimensions)
}
func TestProviderMetadata(t *testing.T) {
provider := New(nil, nil)
require.Equal(t, "signoz.meter.metric.datapoint.size", provider.Name().String())
require.Equal(t, metercollectortypes.UnitBytes, provider.Unit())
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
}
func TestBucketKeyIsStable(t *testing.T) {
first := bucketKey(map[string]string{
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
})
second := bucketKey(map[string]string{
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
})
require.Equal(t, first, second)
require.NotEmpty(t, first)
}
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
require.Error(t, err)
require.Nil(t, readings)
}

View File

@@ -0,0 +1,276 @@
// Package logcountmetercollector collects log count meters by workspace and
// retention. Keep the query local to this meter.
package logcountmetercollector
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"github.com/huandu/go-sqlbuilder"
"github.com/SigNoz/signoz/ee/metercollector/retention"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MeterName is the typed registry key for this collector.
var (
MeterName = metercollectortypes.MustNewName("signoz.meter.log.count")
meterUnit = metercollectortypes.UnitCount
meterAggregation = metercollectortypes.AggregationSum
)
var _ metercollector.MeterCollector = (*Provider)(nil)
// Provider collects log count meters.
type Provider struct {
telemetryStore telemetrystore.TelemetryStore
sqlStore sqlstore.SQLStore
}
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
return &Provider{
telemetryStore: telemetryStore,
sqlStore: sqlStore,
}
}
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
return meterAggregation
}
// Collect aggregates log count for the window and emits an empty-day sentinel.
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
if !window.IsValid() {
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
}
meterName := MeterName.String()
slices, err := retention.LoadActiveSlices(
ctx,
p.sqlStore,
orgID,
telemetrylogs.DBName+"."+telemetrylogs.LogsV2LocalTableName,
retentiontypes.DefaultLogsRetentionDays,
window.StartUnixMilli, window.EndUnixMilli,
)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
}
type bucket struct {
dimensions map[string]string
value float64
}
accumulator := make(map[string]*bucket)
for _, slice := range slices {
query, args, dimensionColumns, err := buildQuery(meterName, slice)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
}
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
if err := func() error {
defer rows.Close()
for rows.Next() {
dimensionValues := make([]string, len(dimensionColumns))
var retentionDays int32
var retentionRuleIndex int32
var value float64
scanDest := make([]any, 0, len(dimensionValues)+3)
for i := range dimensionValues {
scanDest = append(scanDest, &dimensionValues[i])
}
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
if err := rows.Scan(scanDest...); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
key := bucketKey(dimensions)
b, ok := accumulator[key]
if !ok {
b = &bucket{dimensions: dimensions}
accumulator[key] = b
}
b.value += value
}
if err := rows.Err(); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
return nil
}(); err != nil {
return nil, err
}
}
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
for _, b := range accumulator {
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
}
// Empty windows still emit a sentinel so checkpoints can advance.
if len(meters) == 0 && len(slices) > 0 {
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
}))
}
return meters, nil
}
// buildQuery stays local because each meter owns its billing query.
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
if err != nil {
return "", nil, nil, err
}
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
if err != nil {
return "", nil, nil, err
}
columns, err := dimensionColumnsFor(slice.Rules)
if err != nil {
return "", nil, nil, err
}
selects := make([]string, 0, len(columns)+3)
groupBy := make([]string, 0, len(columns)+2)
for _, column := range columns {
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
groupBy = append(groupBy, column.alias)
}
selects = append(selects,
retentionExpr+" AS retention_days",
retentionRuleIndexExpr+" AS retention_rule_index",
"ifNull(sum(value), 0) AS value",
)
groupBy = append(groupBy, "retention_days", "retention_rule_index")
sb := sqlbuilder.NewSelectBuilder()
sb.Select(selects...)
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", meterName),
sb.GTE("unix_milli", slice.StartMs),
sb.LT("unix_milli", slice.EndMs),
)
sb.GroupBy(groupBy...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return query, args, columns, nil
}
type dimensionColumn struct {
key string
alias string
}
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
dimensionKeys, err := retention.RuleDimensionKeys(rules)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(dimensionKeys)+1)
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
for _, key := range dimensionKeys {
if key == metercollector.DimensionWorkspaceKeyID {
continue
}
keys = append(keys, key)
}
columns := make([]dimensionColumn, len(keys))
for i, key := range keys {
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
}
return columns, nil
}
func buildDimensions(
orgID valuer.UUID,
retentionDays int,
retentionRuleIndex int,
columns []dimensionColumn,
values []string,
rules []retentiontypes.CustomRetentionRule,
) (map[string]string, error) {
if len(columns) != len(values) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
}
valuesByKey := make(map[string]string, len(columns))
for i, column := range columns {
valuesByKey[column.key] = values[i]
}
dimensions := map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
}
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
if retentionRuleIndex < 0 {
return dimensions, nil
}
if retentionRuleIndex >= len(rules) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
}
for _, filter := range rules[retentionRuleIndex].Filters {
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
}
return dimensions, nil
}
func addNonEmpty(dimensions map[string]string, key, value string) {
if value == "" {
return
}
dimensions[key] = value
}
func bucketKey(dimensions map[string]string) string {
keys := make([]string, 0, len(dimensions))
for key := range dimensions {
keys = append(keys, key)
}
sort.Strings(keys)
var b strings.Builder
for _, key := range keys {
value := dimensions[key]
b.WriteString(strconv.Itoa(len(key)))
b.WriteByte(':')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(strconv.Itoa(len(value)))
b.WriteByte(':')
b.WriteString(value)
b.WriteByte(';')
}
return b.String()
}

View File

@@ -0,0 +1,67 @@
package logcountmetercollector
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestBuildDimensions(t *testing.T) {
orgID := valuer.GenerateUUID()
rules := []retentiontypes.CustomRetentionRule{{
Filters: []retentiontypes.FilterCondition{{
Key: "service.name",
Values: []string{"api"},
}},
TTLDays: 7,
}}
columns := []dimensionColumn{
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
{key: "service.name", alias: "dim_1"},
}
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
require.NoError(t, err)
require.Equal(t, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
}, dimensions)
}
func TestProviderMetadata(t *testing.T) {
provider := New(nil, nil)
require.Equal(t, "signoz.meter.log.count", provider.Name().String())
require.Equal(t, metercollectortypes.UnitCount, provider.Unit())
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
}
func TestBucketKeyIsStable(t *testing.T) {
first := bucketKey(map[string]string{
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
})
second := bucketKey(map[string]string{
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
})
require.Equal(t, first, second)
require.NotEmpty(t, first)
}
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
require.Error(t, err)
require.Nil(t, readings)
}

View File

@@ -0,0 +1,276 @@
// Package logsizemetercollector collects log size meters by workspace and
// retention. Keep the query local to this meter.
package logsizemetercollector
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"github.com/huandu/go-sqlbuilder"
"github.com/SigNoz/signoz/ee/metercollector/retention"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MeterName is the typed registry key for this collector.
var (
MeterName = metercollectortypes.MustNewName("signoz.meter.log.size")
meterUnit = metercollectortypes.UnitBytes
meterAggregation = metercollectortypes.AggregationSum
)
var _ metercollector.MeterCollector = (*Provider)(nil)
// Provider collects log size meters.
type Provider struct {
telemetryStore telemetrystore.TelemetryStore
sqlStore sqlstore.SQLStore
}
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
return &Provider{
telemetryStore: telemetryStore,
sqlStore: sqlStore,
}
}
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
return meterAggregation
}
// Collect aggregates log size for the window and emits an empty-day sentinel.
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
if !window.IsValid() {
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
}
meterName := MeterName.String()
slices, err := retention.LoadActiveSlices(
ctx,
p.sqlStore,
orgID,
telemetrylogs.DBName+"."+telemetrylogs.LogsV2LocalTableName,
retentiontypes.DefaultLogsRetentionDays,
window.StartUnixMilli, window.EndUnixMilli,
)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
}
type bucket struct {
dimensions map[string]string
value float64
}
accumulator := make(map[string]*bucket)
for _, slice := range slices {
query, args, dimensionColumns, err := buildQuery(meterName, slice)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
}
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
if err := func() error {
defer rows.Close()
for rows.Next() {
dimensionValues := make([]string, len(dimensionColumns))
var retentionDays int32
var retentionRuleIndex int32
var value float64
scanDest := make([]any, 0, len(dimensionValues)+3)
for i := range dimensionValues {
scanDest = append(scanDest, &dimensionValues[i])
}
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
if err := rows.Scan(scanDest...); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
key := bucketKey(dimensions)
b, ok := accumulator[key]
if !ok {
b = &bucket{dimensions: dimensions}
accumulator[key] = b
}
b.value += value
}
if err := rows.Err(); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
return nil
}(); err != nil {
return nil, err
}
}
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
for _, b := range accumulator {
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
}
// Empty windows still emit a sentinel so checkpoints can advance.
if len(meters) == 0 && len(slices) > 0 {
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
}))
}
return meters, nil
}
// buildQuery stays local because each meter owns its billing query.
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
if err != nil {
return "", nil, nil, err
}
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
if err != nil {
return "", nil, nil, err
}
columns, err := dimensionColumnsFor(slice.Rules)
if err != nil {
return "", nil, nil, err
}
selects := make([]string, 0, len(columns)+3)
groupBy := make([]string, 0, len(columns)+2)
for _, column := range columns {
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
groupBy = append(groupBy, column.alias)
}
selects = append(selects,
retentionExpr+" AS retention_days",
retentionRuleIndexExpr+" AS retention_rule_index",
"ifNull(sum(value), 0) AS value",
)
groupBy = append(groupBy, "retention_days", "retention_rule_index")
sb := sqlbuilder.NewSelectBuilder()
sb.Select(selects...)
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", meterName),
sb.GTE("unix_milli", slice.StartMs),
sb.LT("unix_milli", slice.EndMs),
)
sb.GroupBy(groupBy...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return query, args, columns, nil
}
type dimensionColumn struct {
key string
alias string
}
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
dimensionKeys, err := retention.RuleDimensionKeys(rules)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(dimensionKeys)+1)
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
for _, key := range dimensionKeys {
if key == metercollector.DimensionWorkspaceKeyID {
continue
}
keys = append(keys, key)
}
columns := make([]dimensionColumn, len(keys))
for i, key := range keys {
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
}
return columns, nil
}
func buildDimensions(
orgID valuer.UUID,
retentionDays int,
retentionRuleIndex int,
columns []dimensionColumn,
values []string,
rules []retentiontypes.CustomRetentionRule,
) (map[string]string, error) {
if len(columns) != len(values) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
}
valuesByKey := make(map[string]string, len(columns))
for i, column := range columns {
valuesByKey[column.key] = values[i]
}
dimensions := map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
}
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
if retentionRuleIndex < 0 {
return dimensions, nil
}
if retentionRuleIndex >= len(rules) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
}
for _, filter := range rules[retentionRuleIndex].Filters {
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
}
return dimensions, nil
}
func addNonEmpty(dimensions map[string]string, key, value string) {
if value == "" {
return
}
dimensions[key] = value
}
func bucketKey(dimensions map[string]string) string {
keys := make([]string, 0, len(dimensions))
for key := range dimensions {
keys = append(keys, key)
}
sort.Strings(keys)
var b strings.Builder
for _, key := range keys {
value := dimensions[key]
b.WriteString(strconv.Itoa(len(key)))
b.WriteByte(':')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(strconv.Itoa(len(value)))
b.WriteByte(':')
b.WriteString(value)
b.WriteByte(';')
}
return b.String()
}

View File

@@ -0,0 +1,67 @@
package logsizemetercollector
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestBuildDimensions(t *testing.T) {
orgID := valuer.GenerateUUID()
rules := []retentiontypes.CustomRetentionRule{{
Filters: []retentiontypes.FilterCondition{{
Key: "service.name",
Values: []string{"api"},
}},
TTLDays: 7,
}}
columns := []dimensionColumn{
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
{key: "service.name", alias: "dim_1"},
}
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
require.NoError(t, err)
require.Equal(t, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
}, dimensions)
}
func TestProviderMetadata(t *testing.T) {
provider := New(nil, nil)
require.Equal(t, "signoz.meter.log.size", provider.Name().String())
require.Equal(t, metercollectortypes.UnitBytes, provider.Unit())
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
}
func TestBucketKeyIsStable(t *testing.T) {
first := bucketKey(map[string]string{
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
})
second := bucketKey(map[string]string{
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
})
require.Equal(t, first, second)
require.NotEmpty(t, first)
}
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
require.Error(t, err)
require.Nil(t, readings)
}

View File

@@ -0,0 +1,255 @@
// Package retention builds retention slices and SQL expressions for meters.
// Collectors still own their table names, defaults, and aggregation queries.
package retention
import (
"context"
"encoding/json"
"fmt"
"regexp"
"strconv"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
const secondsPerDay = 24 * 60 * 60
// These values are inlined into SQL, so keep the allowlist strict.
var (
labelKeyPattern = regexp.MustCompile(`^[A-Za-z0-9_.\-]+$`)
labelValuePattern = regexp.MustCompile(`^[A-Za-z0-9_.\-:]+$`)
)
// LoadActiveSlices returns TTL slices covering [startMs, endMs).
// tableName must be fully qualified, for example "signoz_logs.logs_v2".
func LoadActiveSlices(
ctx context.Context,
sqlstore sqlstore.SQLStore,
orgID valuer.UUID,
tableName string,
fallbackDefaultDays int,
startMs, endMs int64,
) ([]retentiontypes.Slice, error) {
if startMs >= endMs {
return nil, nil
}
if sqlstore == nil {
return nil, errors.New(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "sqlstore is nil")
}
if tableName == "" {
return nil, errors.New(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "tableName is empty")
}
if fallbackDefaultDays <= 0 {
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "non-positive fallbackDefaultDays %d", fallbackDefaultDays)
}
rows := []*retentiontypes.TTLSetting{}
err := sqlstore.
BunDB().
NewSelect().
Model(&rows).
Where("table_name = ?", tableName).
Where("org_id = ?", orgID.StringValue()).
Where("status = ?", retentiontypes.TTLSettingStatusSuccess).
Where("created_at < ?", time.UnixMilli(endMs).UTC()).
OrderExpr("created_at ASC").
Scan(ctx)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load ttl_setting rows for org %q table %q", orgID.StringValue(), tableName)
}
return buildSlicesFromRows(rows, fallbackDefaultDays, startMs, endMs)
}
func buildSlicesFromRows(rows []*retentiontypes.TTLSetting, fallbackDefaultDays int, startMs, endMs int64) ([]retentiontypes.Slice, error) {
if startMs >= endMs {
return nil, nil
}
// The latest row before the window is active at the window start.
var activeAtStart *retentiontypes.TTLSetting
inWindow := make([]*retentiontypes.TTLSetting, 0, len(rows))
for _, row := range rows {
rowMs := row.CreatedAt.UnixMilli()
if rowMs <= startMs {
activeAtStart = row
continue
}
if rowMs >= endMs {
continue
}
inWindow = append(inWindow, row)
}
activeRules, activeDefault, err := parseTTLSetting(activeAtStart, fallbackDefaultDays)
if err != nil {
return nil, err
}
slices := make([]retentiontypes.Slice, 0, len(inWindow)+1)
cursor := startMs
for _, row := range inWindow {
rowMs := row.CreatedAt.UnixMilli()
if rowMs <= cursor {
// Same-ms updates collapse: replace active config, no empty slice.
activeRules, activeDefault, err = parseTTLSetting(row, fallbackDefaultDays)
if err != nil {
return nil, err
}
continue
}
slices = append(slices, retentiontypes.Slice{
StartMs: cursor,
EndMs: rowMs,
Rules: activeRules,
DefaultDays: activeDefault,
})
cursor = rowMs
activeRules, activeDefault, err = parseTTLSetting(row, fallbackDefaultDays)
if err != nil {
return nil, err
}
}
if cursor < endMs {
slices = append(slices, retentiontypes.Slice{
StartMs: cursor,
EndMs: endMs,
Rules: activeRules,
DefaultDays: activeDefault,
})
}
return slices, nil
}
// parseTTLSetting returns rules and default days for one ttl_setting row.
func parseTTLSetting(row *retentiontypes.TTLSetting, fallbackDefaultDays int) ([]retentiontypes.CustomRetentionRule, int, error) {
if row == nil {
return nil, fallbackDefaultDays, nil
}
defaultDays := row.TTL
if row.Condition == "" {
// V1 stores seconds; round up to days.
defaultDays = (row.TTL + secondsPerDay - 1) / secondsPerDay
}
if defaultDays <= 0 {
defaultDays = fallbackDefaultDays
}
if row.Condition == "" {
return nil, defaultDays, nil
}
var rules []retentiontypes.CustomRetentionRule
if err := json.Unmarshal([]byte(row.Condition), &rules); err != nil {
return nil, 0, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "parse ttl_setting condition for row %q", row.ID.StringValue())
}
return rules, defaultDays, nil
}
// BuildMultiIfSQL renders the retention-days expression for one slice.
func BuildMultiIfSQL(rules []retentiontypes.CustomRetentionRule, defaultDays int) (string, error) {
if defaultDays <= 0 {
return "", errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "non-positive default retention %d", defaultDays)
}
if len(rules) == 0 {
return "toInt32(" + strconv.Itoa(defaultDays) + ")", nil
}
arms := make([]string, 0, 2*len(rules)+1)
for ruleIndex, rule := range rules {
if rule.TTLDays <= 0 {
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d has non-positive ttl_days %d", ruleIndex, rule.TTLDays)
}
conditionExpr, err := buildRuleConditionSQL(ruleIndex, rule)
if err != nil {
return "", err
}
arms = append(arms, conditionExpr)
arms = append(arms, strconv.Itoa(rule.TTLDays))
}
arms = append(arms, strconv.Itoa(defaultDays))
return "toInt32(multiIf(" + strings.Join(arms, ", ") + "))", nil
}
// BuildRuleIndexSQL renders the matched rule index, or -1 for fallback.
func BuildRuleIndexSQL(rules []retentiontypes.CustomRetentionRule) (string, error) {
if len(rules) == 0 {
return "toInt32(-1)", nil
}
arms := make([]string, 0, 2*len(rules)+1)
for ruleIndex, rule := range rules {
conditionExpr, err := buildRuleConditionSQL(ruleIndex, rule)
if err != nil {
return "", err
}
arms = append(arms, conditionExpr)
arms = append(arms, strconv.Itoa(ruleIndex))
}
arms = append(arms, "-1")
return "toInt32(multiIf(" + strings.Join(arms, ", ") + "))", nil
}
func buildRuleConditionSQL(ruleIndex int, rule retentiontypes.CustomRetentionRule) (string, error) {
if len(rule.Filters) == 0 {
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d has no filters", ruleIndex)
}
filterExprs := make([]string, 0, len(rule.Filters))
for filterIndex, filter := range rule.Filters {
if !labelKeyPattern.MatchString(filter.Key) {
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d filter %d has invalid key %q", ruleIndex, filterIndex, filter.Key)
}
if len(filter.Values) == 0 {
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d filter %d has no values", ruleIndex, filterIndex)
}
quoted := make([]string, len(filter.Values))
for valueIndex, value := range filter.Values {
if !labelValuePattern.MatchString(value) {
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d filter %d value %d is invalid %q", ruleIndex, filterIndex, valueIndex, value)
}
quoted[valueIndex] = "'" + value + "'"
}
filterExprs = append(filterExprs, fmt.Sprintf("JSONExtractString(labels, '%s') IN (%s)", filter.Key, strings.Join(quoted, ", ")))
}
return strings.Join(filterExprs, " AND "), nil
}
// RuleDimensionKeys returns unique label keys referenced by retention rules.
func RuleDimensionKeys(rules []retentiontypes.CustomRetentionRule) ([]string, error) {
keys := make([]string, 0)
seen := make(map[string]struct{})
for ruleIndex, rule := range rules {
for filterIndex, filter := range rule.Filters {
if !labelKeyPattern.MatchString(filter.Key) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d filter %d has invalid key %q", ruleIndex, filterIndex, filter.Key)
}
if _, ok := seen[filter.Key]; ok {
continue
}
seen[filter.Key] = struct{}{}
keys = append(keys, filter.Key)
}
}
return keys, nil
}

View File

@@ -0,0 +1,153 @@
package retention
import (
"encoding/json"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/stretchr/testify/require"
)
func TestBuildSlicesFromRows(t *testing.T) {
start := time.Date(2026, 5, 4, 0, 0, 0, 0, time.UTC)
end := start.AddDate(0, 0, 1)
ruleA := retentiontypes.CustomRetentionRule{
Filters: []retentiontypes.FilterCondition{{Key: "service.name", Values: []string{"api"}}},
TTLDays: 7,
}
ruleB := retentiontypes.CustomRetentionRule{
Filters: []retentiontypes.FilterCondition{{Key: "env", Values: []string{"prod"}}},
TTLDays: 15,
}
t.Run("row before window is active at start", func(t *testing.T) {
slices, err := buildSlicesFromRows(
[]*retentiontypes.TTLSetting{
ttlSetting(t, start.Add(-time.Hour), 45, []retentiontypes.CustomRetentionRule{ruleA}),
},
30,
start.UnixMilli(),
end.UnixMilli(),
)
require.NoError(t, err)
require.Equal(t, []retentiontypes.Slice{{
StartMs: start.UnixMilli(),
EndMs: end.UnixMilli(),
Rules: []retentiontypes.CustomRetentionRule{ruleA},
DefaultDays: 45,
}}, slices)
})
t.Run("row inside window splits slices", func(t *testing.T) {
firstChange := start.Add(6 * time.Hour)
secondChange := start.Add(18 * time.Hour)
slices, err := buildSlicesFromRows(
[]*retentiontypes.TTLSetting{
ttlSetting(t, firstChange, 21, []retentiontypes.CustomRetentionRule{ruleA}),
ttlSetting(t, secondChange, 14, []retentiontypes.CustomRetentionRule{ruleB}),
},
30,
start.UnixMilli(),
end.UnixMilli(),
)
require.NoError(t, err)
require.Equal(t, []retentiontypes.Slice{
{
StartMs: start.UnixMilli(),
EndMs: firstChange.UnixMilli(),
DefaultDays: 30,
},
{
StartMs: firstChange.UnixMilli(),
EndMs: secondChange.UnixMilli(),
Rules: []retentiontypes.CustomRetentionRule{ruleA},
DefaultDays: 21,
},
{
StartMs: secondChange.UnixMilli(),
EndMs: end.UnixMilli(),
Rules: []retentiontypes.CustomRetentionRule{ruleB},
DefaultDays: 14,
},
}, slices)
})
t.Run("no rows uses fallback", func(t *testing.T) {
slices, err := buildSlicesFromRows(nil, 30, start.UnixMilli(), end.UnixMilli())
require.NoError(t, err)
require.Equal(t, []retentiontypes.Slice{{
StartMs: start.UnixMilli(),
EndMs: end.UnixMilli(),
DefaultDays: 30,
}}, slices)
})
}
func TestRetentionSQL(t *testing.T) {
rules := []retentiontypes.CustomRetentionRule{{
Filters: []retentiontypes.FilterCondition{{
Key: "service.name",
Values: []string{"api", "worker"},
}},
TTLDays: 7,
}}
retentionSQL, err := BuildMultiIfSQL(rules, 30)
require.NoError(t, err)
require.Equal(t, "toInt32(multiIf(JSONExtractString(labels, 'service.name') IN ('api', 'worker'), 7, 30))", retentionSQL)
ruleIndexSQL, err := BuildRuleIndexSQL(rules)
require.NoError(t, err)
require.Equal(t, "toInt32(multiIf(JSONExtractString(labels, 'service.name') IN ('api', 'worker'), 0, -1))", ruleIndexSQL)
invalidRules := []retentiontypes.CustomRetentionRule{{
Filters: []retentiontypes.FilterCondition{{
Key: "service name",
Values: []string{"api"},
}},
TTLDays: 7,
}}
_, err = BuildMultiIfSQL(invalidRules, 30)
require.Error(t, err)
_, err = BuildRuleIndexSQL(invalidRules)
require.Error(t, err)
}
func TestRuleDimensionKeysDedupes(t *testing.T) {
keys, err := RuleDimensionKeys([]retentiontypes.CustomRetentionRule{
{
Filters: []retentiontypes.FilterCondition{
{Key: "service.name", Values: []string{"api"}},
{Key: "env", Values: []string{"prod"}},
},
TTLDays: 7,
},
{
Filters: []retentiontypes.FilterCondition{
{Key: "service.name", Values: []string{"worker"}},
{Key: "cluster", Values: []string{"primary"}},
},
TTLDays: 15,
},
})
require.NoError(t, err)
require.Equal(t, []string{"service.name", "env", "cluster"}, keys)
}
func ttlSetting(t *testing.T, createdAt time.Time, ttlDays int, rules []retentiontypes.CustomRetentionRule) *retentiontypes.TTLSetting {
t.Helper()
condition, err := json.Marshal(rules)
require.NoError(t, err)
return &retentiontypes.TTLSetting{
CreatedAt: createdAt,
TTL: ttlDays,
Condition: string(condition),
}
}

View File

@@ -0,0 +1,276 @@
// Package spancountmetercollector collects span count meters by workspace and
// retention. Keep the query local to this meter.
package spancountmetercollector
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"github.com/huandu/go-sqlbuilder"
"github.com/SigNoz/signoz/ee/metercollector/retention"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MeterName is the typed registry key for this collector.
var (
MeterName = metercollectortypes.MustNewName("signoz.meter.span.count")
meterUnit = metercollectortypes.UnitCount
meterAggregation = metercollectortypes.AggregationSum
)
var _ metercollector.MeterCollector = (*Provider)(nil)
// Provider collects span count meters.
type Provider struct {
telemetryStore telemetrystore.TelemetryStore
sqlStore sqlstore.SQLStore
}
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
return &Provider{
telemetryStore: telemetryStore,
sqlStore: sqlStore,
}
}
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
return meterAggregation
}
// Collect aggregates span count for the window and emits an empty-day sentinel.
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
if !window.IsValid() {
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
}
meterName := MeterName.String()
slices, err := retention.LoadActiveSlices(
ctx,
p.sqlStore,
orgID,
telemetrytraces.DBName+"."+telemetrytraces.SpanIndexV3LocalTableName,
retentiontypes.DefaultTracesRetentionDays,
window.StartUnixMilli, window.EndUnixMilli,
)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
}
type bucket struct {
dimensions map[string]string
value float64
}
accumulator := make(map[string]*bucket)
for _, slice := range slices {
query, args, dimensionColumns, err := buildQuery(meterName, slice)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
}
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
if err := func() error {
defer rows.Close()
for rows.Next() {
dimensionValues := make([]string, len(dimensionColumns))
var retentionDays int32
var retentionRuleIndex int32
var value float64
scanDest := make([]any, 0, len(dimensionValues)+3)
for i := range dimensionValues {
scanDest = append(scanDest, &dimensionValues[i])
}
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
if err := rows.Scan(scanDest...); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
key := bucketKey(dimensions)
b, ok := accumulator[key]
if !ok {
b = &bucket{dimensions: dimensions}
accumulator[key] = b
}
b.value += value
}
if err := rows.Err(); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
return nil
}(); err != nil {
return nil, err
}
}
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
for _, b := range accumulator {
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
}
// Empty windows still emit a sentinel so checkpoints can advance.
if len(meters) == 0 && len(slices) > 0 {
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
}))
}
return meters, nil
}
// buildQuery stays local because each meter owns its billing query.
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
if err != nil {
return "", nil, nil, err
}
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
if err != nil {
return "", nil, nil, err
}
columns, err := dimensionColumnsFor(slice.Rules)
if err != nil {
return "", nil, nil, err
}
selects := make([]string, 0, len(columns)+3)
groupBy := make([]string, 0, len(columns)+2)
for _, column := range columns {
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
groupBy = append(groupBy, column.alias)
}
selects = append(selects,
retentionExpr+" AS retention_days",
retentionRuleIndexExpr+" AS retention_rule_index",
"ifNull(sum(value), 0) AS value",
)
groupBy = append(groupBy, "retention_days", "retention_rule_index")
sb := sqlbuilder.NewSelectBuilder()
sb.Select(selects...)
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", meterName),
sb.GTE("unix_milli", slice.StartMs),
sb.LT("unix_milli", slice.EndMs),
)
sb.GroupBy(groupBy...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return query, args, columns, nil
}
type dimensionColumn struct {
key string
alias string
}
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
dimensionKeys, err := retention.RuleDimensionKeys(rules)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(dimensionKeys)+1)
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
for _, key := range dimensionKeys {
if key == metercollector.DimensionWorkspaceKeyID {
continue
}
keys = append(keys, key)
}
columns := make([]dimensionColumn, len(keys))
for i, key := range keys {
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
}
return columns, nil
}
func buildDimensions(
orgID valuer.UUID,
retentionDays int,
retentionRuleIndex int,
columns []dimensionColumn,
values []string,
rules []retentiontypes.CustomRetentionRule,
) (map[string]string, error) {
if len(columns) != len(values) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
}
valuesByKey := make(map[string]string, len(columns))
for i, column := range columns {
valuesByKey[column.key] = values[i]
}
dimensions := map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
}
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
if retentionRuleIndex < 0 {
return dimensions, nil
}
if retentionRuleIndex >= len(rules) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
}
for _, filter := range rules[retentionRuleIndex].Filters {
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
}
return dimensions, nil
}
func addNonEmpty(dimensions map[string]string, key, value string) {
if value == "" {
return
}
dimensions[key] = value
}
func bucketKey(dimensions map[string]string) string {
keys := make([]string, 0, len(dimensions))
for key := range dimensions {
keys = append(keys, key)
}
sort.Strings(keys)
var b strings.Builder
for _, key := range keys {
value := dimensions[key]
b.WriteString(strconv.Itoa(len(key)))
b.WriteByte(':')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(strconv.Itoa(len(value)))
b.WriteByte(':')
b.WriteString(value)
b.WriteByte(';')
}
return b.String()
}

View File

@@ -0,0 +1,67 @@
package spancountmetercollector
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestBuildDimensions(t *testing.T) {
orgID := valuer.GenerateUUID()
rules := []retentiontypes.CustomRetentionRule{{
Filters: []retentiontypes.FilterCondition{{
Key: "service.name",
Values: []string{"api"},
}},
TTLDays: 7,
}}
columns := []dimensionColumn{
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
{key: "service.name", alias: "dim_1"},
}
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
require.NoError(t, err)
require.Equal(t, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
}, dimensions)
}
func TestProviderMetadata(t *testing.T) {
provider := New(nil, nil)
require.Equal(t, "signoz.meter.span.count", provider.Name().String())
require.Equal(t, metercollectortypes.UnitCount, provider.Unit())
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
}
func TestBucketKeyIsStable(t *testing.T) {
first := bucketKey(map[string]string{
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
})
second := bucketKey(map[string]string{
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
})
require.Equal(t, first, second)
require.NotEmpty(t, first)
}
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
require.Error(t, err)
require.Nil(t, readings)
}

View File

@@ -0,0 +1,276 @@
// Package spansizemetercollector collects span size meters by workspace and
// retention. Keep the query local to this meter.
package spansizemetercollector
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"github.com/huandu/go-sqlbuilder"
"github.com/SigNoz/signoz/ee/metercollector/retention"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MeterName is the typed registry key for this collector.
var (
MeterName = metercollectortypes.MustNewName("signoz.meter.span.size")
meterUnit = metercollectortypes.UnitBytes
meterAggregation = metercollectortypes.AggregationSum
)
var _ metercollector.MeterCollector = (*Provider)(nil)
// Provider collects span size meters.
type Provider struct {
telemetryStore telemetrystore.TelemetryStore
sqlStore sqlstore.SQLStore
}
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
return &Provider{
telemetryStore: telemetryStore,
sqlStore: sqlStore,
}
}
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
return meterAggregation
}
// Collect aggregates span size for the window and emits an empty-day sentinel.
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
if !window.IsValid() {
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
}
meterName := MeterName.String()
slices, err := retention.LoadActiveSlices(
ctx,
p.sqlStore,
orgID,
telemetrytraces.DBName+"."+telemetrytraces.SpanIndexV3LocalTableName,
retentiontypes.DefaultTracesRetentionDays,
window.StartUnixMilli, window.EndUnixMilli,
)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
}
type bucket struct {
dimensions map[string]string
value float64
}
accumulator := make(map[string]*bucket)
for _, slice := range slices {
query, args, dimensionColumns, err := buildQuery(meterName, slice)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
}
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
if err := func() error {
defer rows.Close()
for rows.Next() {
dimensionValues := make([]string, len(dimensionColumns))
var retentionDays int32
var retentionRuleIndex int32
var value float64
scanDest := make([]any, 0, len(dimensionValues)+3)
for i := range dimensionValues {
scanDest = append(scanDest, &dimensionValues[i])
}
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
if err := rows.Scan(scanDest...); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
key := bucketKey(dimensions)
b, ok := accumulator[key]
if !ok {
b = &bucket{dimensions: dimensions}
accumulator[key] = b
}
b.value += value
}
if err := rows.Err(); err != nil {
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
}
return nil
}(); err != nil {
return nil, err
}
}
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
for _, b := range accumulator {
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
}
// Empty windows still emit a sentinel so checkpoints can advance.
if len(meters) == 0 && len(slices) > 0 {
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
}))
}
return meters, nil
}
// buildQuery stays local because each meter owns its billing query.
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
if err != nil {
return "", nil, nil, err
}
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
if err != nil {
return "", nil, nil, err
}
columns, err := dimensionColumnsFor(slice.Rules)
if err != nil {
return "", nil, nil, err
}
selects := make([]string, 0, len(columns)+3)
groupBy := make([]string, 0, len(columns)+2)
for _, column := range columns {
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
groupBy = append(groupBy, column.alias)
}
selects = append(selects,
retentionExpr+" AS retention_days",
retentionRuleIndexExpr+" AS retention_rule_index",
"ifNull(sum(value), 0) AS value",
)
groupBy = append(groupBy, "retention_days", "retention_rule_index")
sb := sqlbuilder.NewSelectBuilder()
sb.Select(selects...)
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", meterName),
sb.GTE("unix_milli", slice.StartMs),
sb.LT("unix_milli", slice.EndMs),
)
sb.GroupBy(groupBy...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return query, args, columns, nil
}
type dimensionColumn struct {
key string
alias string
}
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
dimensionKeys, err := retention.RuleDimensionKeys(rules)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(dimensionKeys)+1)
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
for _, key := range dimensionKeys {
if key == metercollector.DimensionWorkspaceKeyID {
continue
}
keys = append(keys, key)
}
columns := make([]dimensionColumn, len(keys))
for i, key := range keys {
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
}
return columns, nil
}
func buildDimensions(
orgID valuer.UUID,
retentionDays int,
retentionRuleIndex int,
columns []dimensionColumn,
values []string,
rules []retentiontypes.CustomRetentionRule,
) (map[string]string, error) {
if len(columns) != len(values) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
}
valuesByKey := make(map[string]string, len(columns))
for i, column := range columns {
valuesByKey[column.key] = values[i]
}
dimensions := map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
}
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
if retentionRuleIndex < 0 {
return dimensions, nil
}
if retentionRuleIndex >= len(rules) {
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
}
for _, filter := range rules[retentionRuleIndex].Filters {
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
}
return dimensions, nil
}
func addNonEmpty(dimensions map[string]string, key, value string) {
if value == "" {
return
}
dimensions[key] = value
}
func bucketKey(dimensions map[string]string) string {
keys := make([]string, 0, len(dimensions))
for key := range dimensions {
keys = append(keys, key)
}
sort.Strings(keys)
var b strings.Builder
for _, key := range keys {
value := dimensions[key]
b.WriteString(strconv.Itoa(len(key)))
b.WriteByte(':')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(strconv.Itoa(len(value)))
b.WriteByte(':')
b.WriteString(value)
b.WriteByte(';')
}
return b.String()
}

View File

@@ -0,0 +1,67 @@
package spansizemetercollector
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestBuildDimensions(t *testing.T) {
orgID := valuer.GenerateUUID()
rules := []retentiontypes.CustomRetentionRule{{
Filters: []retentiontypes.FilterCondition{{
Key: "service.name",
Values: []string{"api"},
}},
TTLDays: 7,
}}
columns := []dimensionColumn{
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
{key: "service.name", alias: "dim_1"},
}
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
require.NoError(t, err)
require.Equal(t, map[string]string{
metercollector.DimensionOrganizationID: orgID.StringValue(),
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
}, dimensions)
}
func TestProviderMetadata(t *testing.T) {
provider := New(nil, nil)
require.Equal(t, "signoz.meter.span.size", provider.Name().String())
require.Equal(t, metercollectortypes.UnitBytes, provider.Unit())
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
}
func TestBucketKeyIsStable(t *testing.T) {
first := bucketKey(map[string]string{
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
metercollector.DimensionWorkspaceKeyID: "workspace-1",
})
second := bucketKey(map[string]string{
metercollector.DimensionWorkspaceKeyID: "workspace-1",
"service.name": "api",
metercollector.DimensionRetentionDays: "30",
})
require.Equal(t, first, second)
require.NotEmpty(t, first)
}
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
require.Error(t, err)
require.Nil(t, readings)
}

View File

@@ -0,0 +1,630 @@
package httpmeterreporter
import (
"context"
"fmt"
"log/slog"
"sort"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/zeus"
"github.com/huandu/go-sqlbuilder"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
var _ factory.ServiceWithHealthy = (*Provider)(nil)
var errCodeReportFailed = errors.MustNewCode("meterreporter_report_failed")
const (
phaseSealed = "sealed"
phaseToday = "today"
attrPhase = "phase"
attrResult = "result"
attrMeterReporterProvider = "meterreporter.provider"
attrOrgID = "meterreporter.org_id"
attrOrgCount = "meterreporter.org_count"
attrMeter = "meterreporter.meter"
attrDate = "meterreporter.date"
attrReadings = "meterreporter.readings"
attrReadingsCollected = "meterreporter.readings_collected"
attrReadingsDropped = "meterreporter.readings_dropped"
attrWindowStartUnixMilli = "meterreporter.window_start_unix_milli"
attrWindowEndUnixMilli = "meterreporter.window_end_unix_milli"
attrWindowCompleted = "meterreporter.window_completed"
attrCatchupStart = "meterreporter.catchup_start"
attrCatchupEnd = "meterreporter.catchup_end"
attrDurationMs = "meterreporter.duration_ms"
attrDryRun = "meterreporter.dry_run"
attrIdempotencyKey = "meterreporter.idempotency_key"
resultSuccess = "success"
resultFailure = "failure"
providerName = "http"
)
// Provider collects registered meters and ships them to Zeus.
type Provider struct {
settings factory.ScopedProviderSettings
config meterreporter.Config
collectors []metercollector.MeterCollector
licensing licensing.Licensing
telemetryStore telemetrystore.TelemetryStore
orgGetter organization.Getter
zeus zeus.Zeus
healthyC chan struct{}
stopC chan struct{}
goroutinesWg sync.WaitGroup
metrics *reporterMetrics
}
// NewFactory registers the HTTP meter reporter.
func NewFactory(
collectors map[metercollectortypes.Name]metercollector.MeterCollector,
licensing licensing.Licensing,
telemetryStore telemetrystore.TelemetryStore,
orgGetter organization.Getter,
zeus zeus.Zeus,
) factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
return factory.NewProviderFactory(
factory.MustNewName(providerName),
func(ctx context.Context, providerSettings factory.ProviderSettings, config meterreporter.Config) (meterreporter.Reporter, error) {
return newProvider(ctx, providerSettings, config, collectors, licensing, telemetryStore, orgGetter, zeus)
},
)
}
func newProvider(
_ context.Context,
providerSettings factory.ProviderSettings,
config meterreporter.Config,
collectors map[metercollectortypes.Name]metercollector.MeterCollector,
licensing licensing.Licensing,
telemetryStore telemetrystore.TelemetryStore,
orgGetter organization.Getter,
zeus zeus.Zeus,
) (*Provider, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/meterreporter/httpmeterreporter")
metrics, err := newReporterMetrics(settings.Meter())
if err != nil {
return nil, err
}
orderedCollectors, err := validateCollectors(collectors)
if err != nil {
return nil, err
}
return &Provider{
settings: settings,
config: config,
collectors: orderedCollectors,
licensing: licensing,
telemetryStore: telemetryStore,
orgGetter: orgGetter,
zeus: zeus,
healthyC: make(chan struct{}),
stopC: make(chan struct{}),
metrics: metrics,
}, nil
}
func validateCollectors(collectors map[metercollectortypes.Name]metercollector.MeterCollector) ([]metercollector.MeterCollector, error) {
ordered := make([]metercollector.MeterCollector, 0, len(collectors))
for name, collector := range collectors {
if name.IsZero() {
return nil, errors.New(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "empty meter name in collector registry")
}
if collector == nil {
return nil, errors.Newf(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "nil collector for meter %q", name.String())
}
if collector.Name() != name {
return nil, errors.Newf(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "registry key %q does not match collector.Name() %q", name.String(), collector.Name().String())
}
if collector.Unit().IsZero() {
return nil, errors.Newf(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "meter %q has empty unit", name.String())
}
if collector.Aggregation().IsZero() {
return nil, errors.Newf(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "meter %q has empty aggregation", name.String())
}
ordered = append(ordered, collector)
}
sort.Slice(ordered, func(i, j int) bool {
return ordered[i].Name().String() < ordered[j].Name().String()
})
return ordered, nil
}
// Start runs an immediate tick, then repeats on Config.Interval.
func (provider *Provider) Start(ctx context.Context) error {
close(provider.healthyC)
provider.settings.Logger().InfoContext(ctx, "meter reporter started",
slog.Duration("interval", provider.config.Interval),
slog.Duration("timeout", provider.config.Timeout),
slog.Int("catchup_max_days_per_tick", provider.config.CatchupMaxDaysPerTick),
slog.Int("meters", len(provider.collectors)),
)
provider.goroutinesWg.Add(1)
go func() {
defer provider.goroutinesWg.Done()
provider.runTick(ctx)
ticker := time.NewTicker(provider.config.Interval)
defer ticker.Stop()
for {
select {
case <-provider.stopC:
return
case <-ticker.C:
provider.runTick(ctx)
}
}
}()
provider.goroutinesWg.Wait()
return nil
}
// Stop signals the tick loop and waits for any in-flight tick.
func (provider *Provider) Stop(ctx context.Context) error {
<-provider.healthyC
provider.settings.Logger().InfoContext(ctx, "meter reporter stopping")
select {
case <-provider.stopC:
// already closed
default:
close(provider.stopC)
}
provider.goroutinesWg.Wait()
provider.settings.Logger().InfoContext(ctx, "meter reporter stopped")
return nil
}
func (provider *Provider) Healthy() <-chan struct{} {
return provider.healthyC
}
// runTick executes one collect-and-ship cycle under Config.Timeout.
func (provider *Provider) runTick(parentCtx context.Context) {
tickStart := time.Now()
ctx, span := provider.settings.Tracer().Start(parentCtx, "meterreporter.Tick", trace.WithAttributes(
attribute.String(attrMeterReporterProvider, providerName),
attribute.Int("meterreporter.meters", len(provider.collectors)),
attribute.Int("meterreporter.catchup_max_days_per_tick", provider.config.CatchupMaxDaysPerTick),
))
defer span.End()
provider.metrics.ticks.Add(ctx, 1)
ctx, cancel := context.WithTimeout(ctx, provider.config.Timeout)
defer cancel()
provider.settings.Logger().DebugContext(ctx, "meter reporter tick started",
slog.Duration("timeout", provider.config.Timeout),
slog.Int("meters", len(provider.collectors)),
)
if err := provider.tick(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.SetAttributes(
attribute.String(attrResult, resultFailure),
attribute.Int64(attrDurationMs, time.Since(tickStart).Milliseconds()),
)
provider.settings.Logger().ErrorContext(ctx, "meter reporter tick failed",
errors.Attr(err),
slog.Duration("timeout", provider.config.Timeout),
slog.Duration("duration", time.Since(tickStart)),
)
return
}
span.SetAttributes(
attribute.String(attrResult, resultSuccess),
attribute.Int64(attrDurationMs, time.Since(tickStart).Milliseconds()),
)
provider.settings.Logger().DebugContext(ctx, "meter reporter tick completed", slog.Duration("duration", time.Since(tickStart)))
}
// tick processes sealed catchup days, then today's partial window.
func (provider *Provider) tick(ctx context.Context) error {
now := time.Now().UTC()
// Use one timestamp so a tick cannot straddle midnight.
todayStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
yesterday := todayStart.AddDate(0, 0, -1)
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, errCodeReportFailed, "failed to list organizations")
}
trace.SpanFromContext(ctx).SetAttributes(attribute.Int(attrOrgCount, len(orgs)))
if len(orgs) == 0 {
provider.settings.Logger().InfoContext(ctx, "skipping meter reporter tick; no organizations found")
return nil
}
org := orgs[0]
if len(orgs) > 1 {
// signoz_meter samples have no org marker.
provider.settings.Logger().WarnContext(ctx, "multiple orgs on a single instance; reporting only the first",
slog.Int("org_count", len(orgs)),
slog.String("selected_org_id", org.ID.StringValue()),
)
}
trace.SpanFromContext(ctx).SetAttributes(attribute.String(attrOrgID, org.ID.StringValue()))
license, err := provider.licensing.GetActive(ctx, org.ID)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, errCodeReportFailed, "failed to fetch active license for org %q", org.ID.StringValue())
}
if license == nil || license.Key == "" {
provider.settings.Logger().WarnContext(ctx, "skipping tick, nil/empty license for org", slog.String("org_id", org.ID.StringValue()))
return nil
}
// TODO: re-enable once /v2/meters/checkpoints is live in staging. Until
// then we run with an empty checkpoint map; bootstrap floors are taken
// from data and dropCheckpointed becomes a no-op for the sealed window.
// checkpoints, err := provider.zeus.GetMeterCheckpoints(ctx, license.Key)
// if err != nil {
// provider.metrics.checkpointErrors.Add(ctx, 1)
// provider.settings.Logger().ErrorContext(ctx, "skipping tick: meter checkpoints call failed", errors.Attr(err))
// return nil
// }
// checkpointsByMeter := make(map[string]time.Time, len(checkpoints))
// for _, checkpoint := range checkpoints {
// checkpointsByMeter[checkpoint.Name] = checkpoint.Checkpoint.UTC()
// }
checkpointsByMeter := make(map[string]time.Time)
floor := provider.dataFloor(ctx, todayStart)
catchupStart := provider.catchupStart(floor, todayStart, checkpointsByMeter)
end := catchupStart.AddDate(0, 0, provider.config.CatchupMaxDaysPerTick-1)
if end.After(yesterday) {
end = yesterday
}
trace.SpanFromContext(ctx).SetAttributes(
attribute.String(attrCatchupStart, catchupStart.Format("2006-01-02")),
attribute.String(attrCatchupEnd, end.Format("2006-01-02")),
)
provider.settings.Logger().DebugContext(ctx, "meter reporter catchup window selected",
slog.String("org_id", org.ID.StringValue()),
slog.Time("data_floor", floor),
slog.Time("catchup_start", catchupStart),
slog.Time("catchup_end", end),
slog.Int("catchup_max_days_per_tick", provider.config.CatchupMaxDaysPerTick),
)
for day := catchupStart; !day.After(end); day = day.AddDate(0, 0, 1) {
window := meterreportertypes.Window{
StartUnixMilli: day.UnixMilli(),
EndUnixMilli: day.AddDate(0, 0, 1).UnixMilli(),
IsCompleted: true,
}
err := provider.runPhase(ctx, org.ID, license.Key, window, checkpointsByMeter)
result := resultSuccess
if err != nil {
result = resultFailure
}
provider.metrics.catchupDaysProcessed.Add(ctx, 1, metric.WithAttributes(attribute.String(attrResult, result)))
if err != nil {
provider.settings.Logger().WarnContext(ctx, "stopping sealed catchup after failed day",
errors.Attr(err),
slog.String("date", day.Format("2006-01-02")),
)
break
}
}
// Today's partial window runs every tick.
todayWindow := meterreportertypes.Window{
StartUnixMilli: todayStart.UnixMilli(),
EndUnixMilli: now.UnixMilli(),
IsCompleted: false,
}
_ = provider.runPhase(ctx, org.ID, license.Key, todayWindow, checkpointsByMeter)
return nil
}
// runPhase collects all meters for one window and ships the batch.
func (provider *Provider) runPhase(ctx context.Context, orgID valuer.UUID, licenseKey string, window meterreportertypes.Window, checkpointsByMeter map[string]time.Time) error {
phaseLabel := phaseToday
if window.IsCompleted {
phaseLabel = phaseSealed
}
phaseAttr := metric.WithAttributes(attribute.String(attrPhase, phaseLabel))
date := time.UnixMilli(window.StartUnixMilli).UTC().Format("2006-01-02")
phaseStart := time.Now()
ctx, span := provider.settings.Tracer().Start(ctx, "meterreporter.RunPhase", trace.WithAttributes(
attribute.String(attrPhase, phaseLabel),
attribute.String(attrOrgID, orgID.StringValue()),
attribute.String(attrDate, date),
attribute.Int64(attrWindowStartUnixMilli, window.StartUnixMilli),
attribute.Int64(attrWindowEndUnixMilli, window.EndUnixMilli),
attribute.Bool(attrWindowCompleted, window.IsCompleted),
))
defer span.End()
provider.settings.Logger().DebugContext(ctx, "meter reporter phase started",
slog.String("org_id", orgID.StringValue()),
slog.String("phase", phaseLabel),
slog.String("date", date),
slog.Int64("start_unix_milli", window.StartUnixMilli),
slog.Int64("end_unix_milli", window.EndUnixMilli),
slog.Int("meters", len(provider.collectors)),
)
collectStart := time.Now()
readings := make([]meterreportertypes.Meter, 0, len(provider.collectors))
for _, collector := range provider.collectors {
meterName := collector.Name().String()
collectStart := time.Now()
collectCtx, collectSpan := provider.settings.Tracer().Start(ctx, "meterreporter.CollectMeter", trace.WithAttributes(
attribute.String(attrPhase, phaseLabel),
attribute.String(attrOrgID, orgID.StringValue()),
attribute.String(attrMeter, meterName),
attribute.String(attrDate, date),
attribute.Int64(attrWindowStartUnixMilli, window.StartUnixMilli),
attribute.Int64(attrWindowEndUnixMilli, window.EndUnixMilli),
attribute.Bool(attrWindowCompleted, window.IsCompleted),
))
collectedReadings, err := collector.Collect(collectCtx, orgID, window)
if err != nil {
collectSpan.RecordError(err)
collectSpan.SetStatus(codes.Error, err.Error())
collectSpan.SetAttributes(
attribute.String(attrResult, resultFailure),
attribute.Int64(attrDurationMs, time.Since(collectStart).Milliseconds()),
)
collectSpan.End()
provider.metrics.collectErrors.Add(ctx, 1, phaseAttr)
provider.settings.Logger().WarnContext(ctx, "meter collection failed",
errors.Attr(err),
slog.String("meter", meterName),
slog.String("org_id", orgID.StringValue()),
slog.String("phase", phaseLabel),
slog.String("date", date),
slog.Duration("duration", time.Since(collectStart)),
)
continue
}
collectSpan.SetAttributes(
attribute.String(attrResult, resultSuccess),
attribute.Int(attrReadings, len(collectedReadings)),
attribute.Int64(attrDurationMs, time.Since(collectStart).Milliseconds()),
)
collectSpan.End()
provider.settings.Logger().DebugContext(ctx, "meter collection completed",
slog.String("meter", meterName),
slog.String("org_id", orgID.StringValue()),
slog.String("phase", phaseLabel),
slog.String("date", date),
slog.Int("readings", len(collectedReadings)),
slog.Duration("duration", time.Since(collectStart)),
)
readings = append(readings, collectedReadings...)
}
collectDuration := time.Since(collectStart)
provider.metrics.collectDuration.Add(ctx, collectDuration.Seconds(), phaseAttr)
provider.metrics.collectOperations.Add(ctx, 1, phaseAttr)
span.SetAttributes(attribute.Int(attrReadingsCollected, len(readings)))
if window.IsCompleted {
beforeDrop := len(readings)
readings = dropCheckpointed(readings, time.UnixMilli(window.StartUnixMilli).UTC(), checkpointsByMeter)
dropped := beforeDrop - len(readings)
span.SetAttributes(attribute.Int(attrReadingsDropped, dropped))
if dropped > 0 {
provider.settings.Logger().DebugContext(ctx, "dropped checkpointed meter readings",
slog.String("org_id", orgID.StringValue()),
slog.String("phase", phaseLabel),
slog.String("date", date),
slog.Int("dropped", dropped),
slog.Int("remaining", len(readings)),
)
}
}
if len(readings) == 0 {
span.SetAttributes(
attribute.String(attrResult, resultSuccess),
attribute.Int(attrReadings, 0),
attribute.Int64(attrDurationMs, time.Since(phaseStart).Milliseconds()),
)
provider.settings.Logger().DebugContext(ctx, "meter reporter phase produced no readings",
slog.String("org_id", orgID.StringValue()),
slog.String("phase", phaseLabel),
slog.String("date", date),
slog.Duration("collect_duration", collectDuration),
slog.Duration("duration", time.Since(phaseStart)),
)
return nil
}
shipStart := time.Now()
err := provider.shipReadings(ctx, licenseKey, date, readings)
shipDuration := time.Since(shipStart)
provider.metrics.shipDuration.Add(ctx, shipDuration.Seconds(), phaseAttr)
provider.metrics.shipOperations.Add(ctx, 1, phaseAttr)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.SetAttributes(attribute.String(attrResult, resultFailure))
provider.metrics.postErrors.Add(ctx, 1, phaseAttr)
provider.settings.Logger().ErrorContext(ctx, "failed to ship meter readings",
errors.Attr(err),
slog.String("phase", phaseLabel),
slog.String("date", date),
slog.Int("readings", len(readings)),
slog.Duration("ship_duration", shipDuration),
)
return err
}
provider.metrics.readingsEmitted.Add(ctx, int64(len(readings)), phaseAttr)
span.SetAttributes(
attribute.String(attrResult, resultSuccess),
attribute.Int(attrReadings, len(readings)),
attribute.Int64(attrDurationMs, time.Since(phaseStart).Milliseconds()),
)
provider.settings.Logger().InfoContext(ctx, "meter reporter phase shipped",
slog.String("org_id", orgID.StringValue()),
slog.String("phase", phaseLabel),
slog.String("date", date),
slog.Int("readings", len(readings)),
slog.Duration("collect_duration", collectDuration),
slog.Duration("ship_duration", shipDuration),
slog.Duration("duration", time.Since(phaseStart)),
)
return nil
}
// dropCheckpointed removes readings already covered by meter checkpoints.
func dropCheckpointed(readings []meterreportertypes.Meter, windowDay time.Time, checkpointsByMeter map[string]time.Time) []meterreportertypes.Meter {
if len(checkpointsByMeter) == 0 {
return readings
}
kept := readings[:0]
for _, reading := range readings {
checkpoint, ok := checkpointsByMeter[reading.MeterName]
if !ok || checkpoint.Before(windowDay) {
kept = append(kept, reading)
}
}
return kept
}
// catchupStart returns the earliest UTC day that still needs sealed reporting.
func (provider *Provider) catchupStart(floor time.Time, todayStart time.Time, checkpointsByMeter map[string]time.Time) time.Time {
catchupStart := todayStart
for _, collector := range provider.collectors {
next := floor
if checkpoint, ok := checkpointsByMeter[collector.Name().String()]; ok {
next = checkpoint.AddDate(0, 0, 1)
if next.Before(floor) {
next = floor
}
}
if next.Before(catchupStart) {
catchupStart = next
}
}
yesterday := todayStart.AddDate(0, 0, -1)
if catchupStart.After(yesterday) {
catchupStart = yesterday
}
return catchupStart
}
// dataFloor returns the earliest signoz_meter sample day, or today on failure.
func (provider *Provider) dataFloor(ctx context.Context, todayStart time.Time) time.Time {
ctx, span := provider.settings.Tracer().Start(ctx, "meterreporter.DataFloor")
defer span.End()
if provider.telemetryStore == nil {
span.SetAttributes(attribute.String(attrResult, resultSuccess))
return todayStart
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ifNull(min(unix_milli), 0)")
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var minMs int64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&minMs); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.SetAttributes(attribute.String(attrResult, resultFailure))
provider.settings.Logger().WarnContext(ctx, "failed to read data floor; falling back to latest sealed day", errors.Attr(err))
return todayStart
}
if minMs == 0 {
span.SetAttributes(
attribute.String(attrResult, resultSuccess),
attribute.Int64("meterreporter.data_floor_unix_milli", 0),
)
return todayStart
}
minDay := time.UnixMilli(minMs).UTC()
floor := time.Date(minDay.Year(), minDay.Month(), minDay.Day(), 0, 0, 0, 0, time.UTC)
span.SetAttributes(
attribute.String(attrResult, resultSuccess),
attribute.Int64("meterreporter.data_floor_unix_milli", floor.UnixMilli()),
)
provider.settings.Logger().DebugContext(ctx, "meter reporter data floor loaded", slog.Time("data_floor", floor))
return floor
}
// shipReadings sends one day's meter batch to Zeus.
func (provider *Provider) shipReadings(ctx context.Context, licenseKey string, date string, readings []meterreportertypes.Meter) error {
idempotencyKey := fmt.Sprintf("meter-cron:%s", date)
ctx, span := provider.settings.Tracer().Start(ctx, "meterreporter.ShipReadings", trace.WithAttributes(
attribute.String(attrDate, date),
attribute.Int(attrReadings, len(readings)),
attribute.String(attrIdempotencyKey, idempotencyKey),
attribute.Bool(attrDryRun, true),
))
defer span.End()
provider.settings.Logger().InfoContext(ctx, "meter readings prepared for shipment",
slog.String("date", date),
slog.Int("readings", len(readings)),
slog.String("idempotency_key", idempotencyKey),
slog.Bool("dry_run", true),
)
// Temporary visibility while /v2/meters is offline.
for _, reading := range readings {
provider.settings.Logger().InfoContext(ctx, "meter reading prepared for shipment",
slog.String("meter", reading.MeterName),
slog.Float64("value", reading.Value),
slog.String("unit", reading.Unit.StringValue()),
slog.String("aggregation", reading.Aggregation.StringValue()),
slog.Int64("start_unix_milli", reading.StartUnixMilli),
slog.Int64("end_unix_milli", reading.EndUnixMilli),
slog.Bool("is_completed", reading.IsCompleted),
slog.Any("dimensions", reading.Dimensions),
slog.String("idempotency_key", idempotencyKey),
)
}
// TODO: re-enable once /v2/meters is live in staging.
// body, err := json.Marshal(meterreportertypes.PostableMeters{Meters: readings})
// if err != nil {
// return errors.Wrapf(err, errors.TypeInternal, errCodeReportFailed, "marshal meter readings for %s", date)
// }
// if err := provider.zeus.PutMetersV3(ctx, licenseKey, idempotencyKey, body); err != nil {
// return errors.Wrapf(err, errors.TypeInternal, errCodeReportFailed, "ship meter readings for %s", date)
// }
_ = licenseKey
span.SetAttributes(attribute.String(attrResult, resultSuccess))
return nil
}

View File

@@ -0,0 +1,108 @@
package httpmeterreporter
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/metercollector"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestValidateCollectorsRejectsBadRegistry(t *testing.T) {
meterA := metercollectortypes.MustNewName("signoz.test.a")
meterB := metercollectortypes.MustNewName("signoz.test.b")
t.Run("key name mismatch", func(t *testing.T) {
_, err := validateCollectors(map[metercollectortypes.Name]metercollector.MeterCollector{
meterA: testCollector{name: meterB},
})
require.Error(t, err)
})
t.Run("nil collector", func(t *testing.T) {
_, err := validateCollectors(map[metercollectortypes.Name]metercollector.MeterCollector{
meterA: nil,
})
require.Error(t, err)
})
}
func TestDropCheckpointed(t *testing.T) {
meterA := metercollectortypes.MustNewName("signoz.test.a")
meterB := metercollectortypes.MustNewName("signoz.test.b")
meterC := metercollectortypes.MustNewName("signoz.test.c")
windowDay := time.Date(2026, 5, 4, 0, 0, 0, 0, time.UTC)
window := meterreportertypes.Window{
StartUnixMilli: windowDay.UnixMilli(),
EndUnixMilli: windowDay.AddDate(0, 0, 1).UnixMilli(),
IsCompleted: true,
}
readings := []meterreportertypes.Meter{
meterreportertypes.NewMeter(meterA, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
meterreportertypes.NewMeter(meterB, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
meterreportertypes.NewMeter(meterC, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
}
kept := dropCheckpointed(readings, windowDay, map[string]time.Time{
meterA.String(): windowDay,
meterB.String(): windowDay.AddDate(0, 0, -1),
})
require.Equal(t, []meterreportertypes.Meter{
meterreportertypes.NewMeter(meterB, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
meterreportertypes.NewMeter(meterC, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
}, kept)
}
func TestCatchupStart(t *testing.T) {
meterA := metercollectortypes.MustNewName("signoz.test.a")
floor := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
todayStart := time.Date(2026, 5, 5, 0, 0, 0, 0, time.UTC)
provider := &Provider{
collectors: []metercollector.MeterCollector{
testCollector{name: meterA},
},
}
t.Run("no checkpoint starts at floor", func(t *testing.T) {
require.Equal(t, floor, provider.catchupStart(floor, todayStart, nil))
})
t.Run("checkpoint advances by one day", func(t *testing.T) {
require.Equal(t, floor.AddDate(0, 0, 2), provider.catchupStart(floor, todayStart, map[string]time.Time{
meterA.String(): floor.AddDate(0, 0, 1),
}))
})
}
type testCollector struct {
name metercollectortypes.Name
unit metercollectortypes.Unit
aggregation metercollectortypes.Aggregation
}
func (c testCollector) Name() metercollectortypes.Name {
return c.name
}
func (c testCollector) Unit() metercollectortypes.Unit {
if c.unit.IsZero() {
return metercollectortypes.UnitCount
}
return c.unit
}
func (c testCollector) Aggregation() metercollectortypes.Aggregation {
if c.aggregation.IsZero() {
return metercollectortypes.AggregationSum
}
return c.aggregation
}
func (c testCollector) Collect(context.Context, valuer.UUID, meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
return nil, nil
}

View File

@@ -0,0 +1,90 @@
package httpmeterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"go.opentelemetry.io/otel/metric"
)
type reporterMetrics struct {
ticks metric.Int64Counter
readingsEmitted metric.Int64Counter
collectErrors metric.Int64Counter
postErrors metric.Int64Counter
checkpointErrors metric.Int64Counter
catchupDaysProcessed metric.Int64Counter
collectDuration metric.Float64Counter
collectOperations metric.Int64Counter
shipDuration metric.Float64Counter
shipOperations metric.Int64Counter
}
func newReporterMetrics(meter metric.Meter) (*reporterMetrics, error) {
var errs error
ticks, err := meter.Int64Counter("signoz.meterreporter.ticks", metric.WithDescription("Meter reporter ticks."))
if err != nil {
errs = errors.Join(errs, err)
}
readingsEmitted, err := meter.Int64Counter("signoz.meterreporter.readings.emitted", metric.WithDescription("Meter readings shipped to Zeus."))
if err != nil {
errs = errors.Join(errs, err)
}
collectErrors, err := meter.Int64Counter("signoz.meterreporter.collect.errors", metric.WithDescription("Meter collection errors."))
if err != nil {
errs = errors.Join(errs, err)
}
postErrors, err := meter.Int64Counter("signoz.meterreporter.post.errors", metric.WithDescription("Zeus POST failures."))
if err != nil {
errs = errors.Join(errs, err)
}
checkpointErrors, err := meter.Int64Counter("signoz.meterreporter.checkpoint.errors", metric.WithDescription("Zeus checkpoint read failures."))
if err != nil {
errs = errors.Join(errs, err)
}
catchupDaysProcessed, err := meter.Int64Counter("signoz.meterreporter.catchup.days_processed", metric.WithDescription("Sealed catchup days processed."))
if err != nil {
errs = errors.Join(errs, err)
}
collectDuration, err := meter.Float64Counter("signoz.meterreporter.collect.duration.seconds", metric.WithDescription("Cumulative collection duration."), metric.WithUnit("s"))
if err != nil {
errs = errors.Join(errs, err)
}
collectOperations, err := meter.Int64Counter("signoz.meterreporter.collect.operations", metric.WithDescription("Collection phases measured."))
if err != nil {
errs = errors.Join(errs, err)
}
shipDuration, err := meter.Float64Counter("signoz.meterreporter.ship.duration.seconds", metric.WithDescription("Cumulative ship duration."), metric.WithUnit("s"))
if err != nil {
errs = errors.Join(errs, err)
}
shipOperations, err := meter.Int64Counter("signoz.meterreporter.ship.operations", metric.WithDescription("Ship phases measured."))
if err != nil {
errs = errors.Join(errs, err)
}
if errs != nil {
return nil, errs
}
return &reporterMetrics{
ticks: ticks,
readingsEmitted: readingsEmitted,
collectErrors: collectErrors,
postErrors: postErrors,
checkpointErrors: checkpointErrors,
catchupDaysProcessed: catchupDaysProcessed,
collectDuration: collectDuration,
collectOperations: collectOperations,
shipDuration: shipDuration,
shipOperations: shipOperations,
}, nil
}

View File

@@ -150,6 +150,72 @@ func (provider *Provider) PutMetersV2(ctx context.Context, key string, data []by
return err
}
func (provider *Provider) PutMetersV3(ctx context.Context, key string, idempotencyKey string, data []byte) error {
headers := http.Header{}
if idempotencyKey != "" {
headers.Set("X-Idempotency-Key", idempotencyKey)
}
_, err := provider.doWithHeaders(
ctx,
provider.config.URL.JoinPath("/v2/meters"),
http.MethodPost,
key,
data,
headers,
)
return err
}
func (provider *Provider) GetMeterCheckpoints(ctx context.Context, key string) ([]zeustypes.MeterCheckpoint, error) {
response, err := provider.do(
ctx,
provider.config.URL.JoinPath("/v2/meters/checkpoints"),
http.MethodGet,
key,
nil,
)
if err != nil {
return nil, err
}
checkpointValues := gjson.GetBytes(response, "data.checkpoints")
if !checkpointValues.Exists() || checkpointValues.Type == gjson.Null {
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoints are required")
}
if !checkpointValues.IsArray() {
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoints must be an array")
}
checkpointResults := checkpointValues.Array()
checkpoints := make([]zeustypes.MeterCheckpoint, 0, len(checkpointResults))
for _, checkpointValue := range checkpointResults {
name := checkpointValue.Get("name").String()
if name == "" {
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoint name is required")
}
checkpointString := checkpointValue.Get("checkpoint").String()
if checkpointString == "" {
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoint is required for %q", name)
}
checkpoint, err := time.Parse("2006-01-02", checkpointString)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, zeus.ErrCodeResponseMalformed, "parse meter checkpoint %q for %q", checkpointString, name)
}
checkpoints = append(checkpoints, zeustypes.MeterCheckpoint{
Name: name,
Checkpoint: checkpoint,
})
}
return checkpoints, nil
}
func (provider *Provider) PutProfile(ctx context.Context, key string, profile *zeustypes.PostableProfile) error {
body, err := json.Marshal(profile)
if err != nil {
@@ -185,12 +251,21 @@ func (provider *Provider) PutHost(ctx context.Context, key string, host *zeustyp
}
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
return provider.doWithHeaders(ctx, url, method, key, requestBody, nil)
}
func (provider *Provider) doWithHeaders(ctx context.Context, url *url.URL, method string, key string, requestBody []byte, extraHeaders http.Header) ([]byte, error) {
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
request.Header.Set("Content-Type", "application/json")
for k, vs := range extraHeaders {
for _, v := range vs {
request.Header.Add(k, v)
}
}
response, err := provider.httpClient.Do(request)
if err != nil {

View File

@@ -8,6 +8,7 @@ var (
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
FeatureUseMeterReporter = featuretypes.MustNewName("use_meter_reporter")
FeatureUseJSONBody = featuretypes.MustNewName("use_json_body")
)
@@ -53,6 +54,14 @@ func MustNewRegistry() featuretypes.Registry {
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureUseMeterReporter,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether the enterprise meter reporter runs instead of the noop reporter",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureUseJSONBody,
Kind: featuretypes.KindBoolean,

View File

@@ -0,0 +1,12 @@
package metercollector
const (
// DimensionOrganizationID identifies the organization.
DimensionOrganizationID = "signoz.billing.organization.id"
// DimensionRetentionDays identifies the retention bucket a meter belongs to.
DimensionRetentionDays = "signoz.billing.retention.days"
// DimensionWorkspaceKeyID identifies the ingestion workspace key.
DimensionWorkspaceKeyID = "signoz.workspace.key.id"
)

View File

@@ -0,0 +1,6 @@
package metercollector
import "github.com/SigNoz/signoz/pkg/errors"
// ErrCodeCollectFailed is the shared error code for collector failures.
var ErrCodeCollectFailed = errors.MustNewCode("metercollector_collect_failed")

View File

@@ -0,0 +1,19 @@
// Package metercollector defines the contract for billing meter collectors.
package metercollector
import (
"context"
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MeterCollector owns one billing meter's metadata and collection query.
// Collect stamps DimensionOrganizationID and returns errors instead of panics.
type MeterCollector interface {
Name() metercollectortypes.Name
Unit() metercollectortypes.Unit
Aggregation() metercollectortypes.Aggregation
Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error)
}

View File

@@ -0,0 +1,53 @@
package meterreporter
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
var _ factory.Config = (*Config)(nil)
type Config struct {
// Interval is how often the reporter collects and ships meters.
Interval time.Duration `mapstructure:"interval"`
// Timeout bounds one collect-and-ship tick.
Timeout time.Duration `mapstructure:"timeout"`
// CatchupMaxDaysPerTick caps sealed-day catchup work per tick.
CatchupMaxDaysPerTick int `mapstructure:"catchup_max_days_per_tick"`
}
func newConfig() factory.Config {
return Config{
Interval: 6 * time.Hour,
Timeout: 5 * time.Minute,
CatchupMaxDaysPerTick: 180,
}
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("meterreporter"), newConfig)
}
func (c Config) Validate() error {
if c.Interval < 5*time.Minute {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be at least 5m")
}
if c.Timeout < 3*time.Minute {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be at least 3m")
}
if c.Timeout >= c.Interval {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be less than meterreporter::interval")
}
if c.CatchupMaxDaysPerTick < 1 || c.CatchupMaxDaysPerTick > 180 {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::catchup_max_days_per_tick must be between 1 and 180")
}
return nil
}

View File

@@ -0,0 +1,14 @@
package meterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
var (
ErrCodeInvalidInput = errors.MustNewCode("meterreporter_invalid_input")
)
type Reporter interface {
factory.ServiceWithHealthy
}

View File

@@ -0,0 +1,39 @@
package noopmeterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/meterreporter"
)
type provider struct {
healthyC chan struct{}
stopC chan struct{}
}
func NewFactory() factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
}
func New(_ context.Context, _ factory.ProviderSettings, _ meterreporter.Config) (meterreporter.Reporter, error) {
return &provider{
healthyC: make(chan struct{}),
stopC: make(chan struct{}),
}, nil
}
func (p *provider) Start(_ context.Context) error {
close(p.healthyC)
<-p.stopC
return nil
}
func (p *provider) Stop(_ context.Context) error {
close(p.stopC)
return nil
}
func (p *provider) Healthy() <-chan struct{} {
return p.healthyC
}

View File

@@ -21,9 +21,9 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -1342,7 +1342,7 @@ func getLocalTableName(tableName string) string {
}
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -1377,7 +1377,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
if apiErr != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
@@ -1425,18 +1425,14 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
// we will change ttl for only the new parts and not the old ones
query += " SETTINGS materialize_ttl_after_modify=0"
ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
ttl := retentiontypes.TTLSetting{
ID: valuer.GenerateUUID(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
Status: retentiontypes.TTLSettingStatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
@@ -1460,9 +1456,9 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1480,9 +1476,9 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1495,9 +1491,9 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1507,10 +1503,10 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
}
}(ttlPayload)
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalTraces.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -1540,7 +1536,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
if apiErr != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
@@ -1563,18 +1559,14 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
timestamp = "end"
}
ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
ttl := retentiontypes.TTLSetting{
ID: valuer.GenerateUUID(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
Status: retentiontypes.TTLSettingStatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
@@ -1610,9 +1602,9 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1631,9 +1623,9 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1646,9 +1638,9 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -1657,7 +1649,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
}
}(distributedTableName)
}
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool, error) {
@@ -1686,7 +1678,7 @@ func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool,
return true, nil
}
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) {
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *retentiontypes.CustomRetentionTTLParams) (*retentiontypes.CustomRetentionTTLResponse, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
@@ -1701,7 +1693,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
if !hasCustomRetention {
r.logger.Info("Custom retention not supported, falling back to standard TTL method", "orgID", orgID)
ttlParams := &model.TTLParams{
ttlParams := &retentiontypes.TTLParams{
Type: params.Type,
DelDuration: int64(params.DefaultTTLDays * 24 * 3600),
}
@@ -1722,7 +1714,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
return nil, errorsV2.Wrapf(apiErr.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to set standard TTL")
}
return &model.CustomRetentionTTLResponse{
return &retentiontypes.CustomRetentionTTLResponse{
Message: fmt.Sprintf("Custom retention not supported, applied standard TTL of %d days. %s", params.DefaultTTLDays, ttlResult.Message),
}, nil
}
@@ -1733,7 +1725,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
uuidWithHyphen := valuer.GenerateUUID()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
if params.Type != constants.LogsTTL {
if params.Type != retentiontypes.LogsTTL {
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL only supported for logs")
}
@@ -1764,7 +1756,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
if apiErr != nil {
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query")
}
if statusItem.Status == constants.StatusPending {
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL is already running")
}
}
@@ -1838,19 +1830,15 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
}
for tableName, queries := range ttlPayload {
customTTL := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
customTTL := retentiontypes.TTLSetting{
ID: valuer.GenerateUUID(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
TransactionID: uuid,
TableName: tableName,
TTL: params.DefaultTTLDays,
Condition: string(ttlConditionsJSON),
Status: constants.StatusPending,
Status: retentiontypes.TTLSettingStatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
@@ -1866,7 +1854,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
err := r.setColdStorage(ctx, tableName, params.ColdStorageVolume)
if err != nil {
r.logger.Error("error in setting cold storage", errorsV2.Attr(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusFailed)
return nil, errorsV2.Wrapf(err.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting cold storage for table %s", tableName)
}
}
@@ -1875,21 +1863,21 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
r.logger.Debug("Executing custom retention TTL request: ", "request", query, "step", i+1)
if err := r.db.Exec(ctx, query); err != nil {
r.logger.Error("error while setting custom retention ttl", errorsV2.Attr(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusFailed)
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s, query: %s", tableName, query)
}
}
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusSuccess)
}
return &model.CustomRetentionTTLResponse{
return &retentiontypes.CustomRetentionTTLResponse{
Message: "custom retention TTL has been successfully set up",
}, nil
}
// New method to build multiIf expressions with support for multiple AND conditions
func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []model.CustomRetentionRule, defaultTTLDays int, isResourceTable bool) string {
func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []retentiontypes.CustomRetentionRule, defaultTTLDays int, isResourceTable bool) string {
var conditions []string
for i, rule := range ttlConditions {
@@ -1961,7 +1949,7 @@ func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []model.CustomRe
return result
}
func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error) {
func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID string) (*retentiontypes.GetCustomRetentionTTLResponse, error) {
// Check if V2 (custom retention) is supported
hasCustomRetention, err := r.hasCustomRetentionColumn(ctx)
if err != nil {
@@ -1970,14 +1958,14 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
hasCustomRetention = false
}
response := &model.GetCustomRetentionTTLResponse{}
response := &retentiontypes.GetCustomRetentionTTLResponse{}
if hasCustomRetention {
// V2 - Custom retention is supported
response.Version = "v2"
// Get the latest custom retention TTL setting
customTTL := new(types.TTLSetting)
customTTL := new(retentiontypes.TTLSetting)
err := r.sqlDB.BunDB().NewSelect().
Model(customTTL).
Where("org_id = ?", orgID).
@@ -1993,19 +1981,19 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
if err == sql.ErrNoRows {
// No V2 configuration found, return defaults
response.DefaultTTLDays = 15
response.TTLConditions = []model.CustomRetentionRule{}
response.Status = constants.StatusSuccess
response.DefaultTTLDays = retentiontypes.DefaultLogsRetentionDays
response.TTLConditions = []retentiontypes.CustomRetentionRule{}
response.Status = retentiontypes.TTLSettingStatusSuccess
response.ColdStorageTTLDays = -1
return response, nil
}
// Parse TTL conditions from Condition
var ttlConditions []model.CustomRetentionRule
var ttlConditions []retentiontypes.CustomRetentionRule
if customTTL.Condition != "" {
if err := json.Unmarshal([]byte(customTTL.Condition), &ttlConditions); err != nil {
r.logger.Error("Error parsing TTL conditions", errorsV2.Attr(err))
ttlConditions = []model.CustomRetentionRule{}
ttlConditions = []retentiontypes.CustomRetentionRule{}
}
}
@@ -2019,8 +2007,8 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
response.Version = "v1"
// Get V1 TTL configuration
ttlParams := &model.GetTTLParams{
Type: constants.LogsTTL,
ttlParams := &retentiontypes.GetTTLParams{
Type: retentiontypes.LogsTTL,
}
ttlResult, apiErr := r.GetTTL(ctx, orgID, ttlParams)
@@ -2040,14 +2028,14 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
}
// For V1, we don't have TTL conditions
response.TTLConditions = []model.CustomRetentionRule{}
response.TTLConditions = []retentiontypes.CustomRetentionRule{}
}
return response, nil
}
func (r *ClickHouseReader) checkCustomRetentionTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, error) {
ttl := new(types.TTLSetting)
func (r *ClickHouseReader) checkCustomRetentionTTLStatusItem(ctx context.Context, orgID string, tableName string) (*retentiontypes.TTLSetting, error) {
ttl := new(retentiontypes.TTLSetting)
err := r.sqlDB.BunDB().NewSelect().
Model(ttl).
Where("table_name = ?", tableName).
@@ -2068,7 +2056,7 @@ func (r *ClickHouseReader) updateCustomRetentionTTLStatus(ctx context.Context, o
statusItem, apiErr := r.checkCustomRetentionTTLStatusItem(ctx, orgID, tableName)
if apiErr == nil && statusItem != nil {
_, dbErr := r.sqlDB.BunDB().NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", status).
Where("id = ?", statusItem.ID.StringValue()).
@@ -2080,7 +2068,7 @@ func (r *ClickHouseReader) updateCustomRetentionTTLStatus(ctx context.Context, o
}
// Enhanced validation function with duplicate detection and efficient key validation
func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditions []model.CustomRetentionRule) error {
func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditions []retentiontypes.CustomRetentionRule) error {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "clickhouse-reader",
instrumentationtypes.CodeFunctionName: "validateTTLConditions",
@@ -2184,16 +2172,16 @@ func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditi
// SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db.
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, orgID, 100)
switch params.Type {
case constants.TraceTTL:
case retentiontypes.TraceTTL:
return r.setTTLTraces(ctx, orgID, params)
case constants.MetricsTTL:
case retentiontypes.MetricsTTL:
return r.setTTLMetrics(ctx, orgID, params)
case constants.LogsTTL:
case retentiontypes.LogsTTL:
return r.setTTLLogs(ctx, orgID, params)
default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", params.Type)}
@@ -2201,7 +2189,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *mod
}
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalMetrics.StringValue(),
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -2230,23 +2218,19 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
if apiErr != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
metricTTL := func(tableName string) {
ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
ttl := retentiontypes.TTLSetting{
ID: valuer.GenerateUUID(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
Status: retentiontypes.TTLSettingStatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
@@ -2282,9 +2266,9 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -2303,9 +2287,9 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -2318,9 +2302,9 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
@@ -2331,7 +2315,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
for _, tableName := range tableNames {
go metricTTL(tableName)
}
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID string, numberOfTransactionsStore int) {
@@ -2341,7 +2325,7 @@ func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID stri
BunDB().
NewSelect().
Column("transaction_id").
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Where("org_id = ?", orgID).
Group("transaction_id").
OrderExpr("MAX(created_at) DESC").
@@ -2356,7 +2340,7 @@ func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID stri
sqlDB.
BunDB().
NewDelete().
Model(new(types.TTLSetting)).
Model(new(retentiontypes.TTLSetting)).
Where("transaction_id NOT IN (?)", bun.In(limitTransactions)).
Exec(ctx)
if err != nil {
@@ -2365,9 +2349,9 @@ func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID stri
}
// checkTTLStatusItem checks if ttl_status table has an entry for the given table name
func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, *model.ApiError) {
func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string, tableName string) (*retentiontypes.TTLSetting, *model.ApiError) {
r.logger.Info("checkTTLStatusItem query", "tableName", tableName)
ttl := new(types.TTLSetting)
ttl := new(retentiontypes.TTLSetting)
err := r.
sqlDB.
BunDB().
@@ -2388,26 +2372,26 @@ func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string,
// getTTLQueryStatus fetches ttl_status table status from DB
func (r *ClickHouseReader) getTTLQueryStatus(ctx context.Context, orgID string, tableNameArray []string) (string, *model.ApiError) {
failFlag := false
status := constants.StatusSuccess
status := retentiontypes.TTLSettingStatusSuccess
for _, tableName := range tableNameArray {
statusItem, apiErr := r.checkTTLStatusItem(ctx, orgID, tableName)
emptyStatusStruct := new(types.TTLSetting)
emptyStatusStruct := new(retentiontypes.TTLSetting)
if statusItem == emptyStatusStruct {
return "", nil
}
if apiErr != nil {
return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
status = constants.StatusPending
if statusItem.Status == retentiontypes.TTLSettingStatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
status = retentiontypes.TTLSettingStatusPending
return status, nil
}
if statusItem.Status == constants.StatusFailed {
if statusItem.Status == retentiontypes.TTLSettingStatusFailed {
failFlag = true
}
}
if failFlag {
status = constants.StatusFailed
status = retentiontypes.TTLSettingStatusFailed
}
return status, nil
@@ -2460,7 +2444,7 @@ func getLocalTableNameArray(tableNames []string) []string {
}
// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces.
func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.GetTTLParams) (*retentiontypes.GetTTLResponseItem, *model.ApiError) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "clickhouse-reader",
@@ -2495,8 +2479,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
return delTTL, moveTTL
}
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
getMetricsTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
var dbResp []retentiontypes.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleLocalTableName)
@@ -2513,8 +2497,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
}
getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
getTracesTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
var dbResp []retentiontypes.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.traceLocalTableName, signozTraceDBName)
@@ -2531,8 +2515,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
}
getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
getLogsTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
var dbResp []retentiontypes.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTableName, r.logsDB)
@@ -2550,7 +2534,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
switch ttlParams.Type {
case constants.TraceTTL:
case retentiontypes.TraceTTL:
tableNameArray := []string{
r.TraceDB + "." + r.traceTableName,
r.TraceDB + "." + r.traceResourceTableV3,
@@ -2578,9 +2562,9 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
return &retentiontypes.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
case constants.MetricsTTL:
case retentiontypes.MetricsTTL:
tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName}
tableNameArray = getLocalTableNameArray(tableNameArray)
status, apiErr := r.getTTLQueryStatus(ctx, orgID, tableNameArray)
@@ -2601,9 +2585,9 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
return &retentiontypes.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
case constants.LogsTTL:
case retentiontypes.LogsTTL:
tableNameArray := []string{r.logsDB + "." + r.logsTableName}
tableNameArray = getLocalTableNameArray(tableNameArray)
status, apiErr := r.getTTLQueryStatus(ctx, orgID, tableNameArray)
@@ -2624,7 +2608,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
return &retentiontypes.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",

View File

@@ -34,6 +34,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
@@ -1677,7 +1678,7 @@ func (aH *APIHandler) setCustomRetentionTTL(w http.ResponseWriter, r *http.Reque
return
}
var params model.CustomRetentionTTLParams
var params retentiontypes.CustomRetentionTTLParams
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "Invalid data"))
return

View File

@@ -40,6 +40,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/query-service/utils"
querytemplate "github.com/SigNoz/signoz/pkg/query-service/utils/queryTemplate"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
chVariables "github.com/SigNoz/signoz/pkg/variables/clickhouse"
)
@@ -419,7 +420,7 @@ func parseTime(param string, r *http.Request) (*time.Time, error) {
}
func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
func parseTTLParams(r *http.Request) (*retentiontypes.TTLParams, error) {
// make sure either of the query params are present
typeTTL := r.URL.Query().Get("type")
@@ -432,7 +433,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
}
// Validate the type parameter
if typeTTL != baseconstants.TraceTTL && typeTTL != baseconstants.MetricsTTL && typeTTL != baseconstants.LogsTTL {
if typeTTL != retentiontypes.TraceTTL && typeTTL != retentiontypes.MetricsTTL && typeTTL != retentiontypes.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
}
@@ -455,7 +456,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
}
}
return &model.TTLParams{
return &retentiontypes.TTLParams{
Type: typeTTL,
DelDuration: int64(durationParsed.Seconds()),
ColdStorageVolume: coldStorage,
@@ -463,7 +464,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
}, nil
}
func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
func parseGetTTL(r *http.Request) (*retentiontypes.GetTTLParams, error) {
typeTTL := r.URL.Query().Get("type")
@@ -471,12 +472,12 @@ func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
return nil, fmt.Errorf("type param cannot be empty from the query")
} else {
// Validate the type parameter
if typeTTL != baseconstants.TraceTTL && typeTTL != baseconstants.MetricsTTL && typeTTL != baseconstants.LogsTTL {
if typeTTL != retentiontypes.TraceTTL && typeTTL != retentiontypes.MetricsTTL && typeTTL != retentiontypes.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
}
}
return &model.GetTTLParams{Type: typeTTL}, nil
return &retentiontypes.GetTTLParams{Type: typeTTL}, nil
}
func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequest, error) {

View File

@@ -19,10 +19,6 @@ const (
const MaxAllowedPointsInTimeSeries = 300
const TraceTTL = "traces"
const MetricsTTL = "metrics"
const LogsTTL = "logs"
const SpanSearchScopeRoot = "isroot"
const SpanSearchScopeEntryPoint = "isentrypoint"
const OrderBySpanCount = "span_count"

View File

@@ -7,6 +7,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/stats"
@@ -23,8 +24,8 @@ type Reader interface {
GetServicesList(ctx context.Context) (*[]string, error)
GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error)
GetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.GetTTLParams) (*retentiontypes.GetTTLResponseItem, *model.ApiError)
GetCustomRetentionTTL(ctx context.Context, orgID string) (*retentiontypes.GetCustomRetentionTTLResponse, error)
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
// clickhouse only.
@@ -46,8 +47,8 @@ type Reader interface {
GetFlamegraphSpansForTrace(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, error)
// Setter Interfaces
SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error)
SetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError)
SetTTLV2(ctx context.Context, orgID string, params *retentiontypes.CustomRetentionTTLParams) (*retentiontypes.CustomRetentionTTLResponse, error)
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)

View File

@@ -404,56 +404,6 @@ type TagKey struct {
Type TagDataType `json:"type"`
}
type TTLParams struct {
Type string // It can be one of {traces, metrics}.
ColdStorageVolume string // Name of the cold storage volume.
ToColdStorageDuration int64 // Seconds after which data will be moved to cold storage.
DelDuration int64 // Seconds after which data will be deleted.
}
type CustomRetentionTTLParams struct {
Type string `json:"type"`
DefaultTTLDays int `json:"defaultTTLDays"`
TTLConditions []CustomRetentionRule `json:"ttlConditions"`
ColdStorageVolume string `json:"coldStorageVolume,omitempty"`
ToColdStorageDurationDays int64 `json:"coldStorageDurationDays,omitempty"`
}
type CustomRetentionRule struct {
Filters []FilterCondition `json:"conditions"`
TTLDays int `json:"ttlDays"`
}
type FilterCondition struct {
Key string `json:"key"`
Values []string `json:"values"`
}
type GetCustomRetentionTTLResponse struct {
Version string `json:"version"`
Status string `json:"status"`
// V1 fields
// LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
// LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
// V2 fields
DefaultTTLDays int `json:"default_ttl_days,omitempty"`
TTLConditions []CustomRetentionRule `json:"ttl_conditions,omitempty"`
ColdStorageVolume string `json:"cold_storage_volume,omitempty"`
ColdStorageTTLDays int `json:"cold_storage_ttl_days,omitempty"`
}
type CustomRetentionTTLResponse struct {
Message string `json:"message"`
}
type GetTTLParams struct {
Type string
}
type ListErrorsParams struct {
StartStr string `json:"start"`
EndStr string `json:"end"`

View File

@@ -150,16 +150,6 @@ type RuleResponseItem struct {
Data string `json:"data" db:"data"`
}
type TTLStatusItem struct {
Id int `json:"id" db:"id"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
TableName string `json:"table_name" db:"table_name"`
TTL int `json:"ttl" db:"ttl"`
Status string `json:"status" db:"status"`
ColdStorageTtl int `json:"cold_storage_ttl" db:"cold_storage_ttl"`
}
type ChannelItem struct {
Id int `json:"id" db:"id"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
@@ -462,35 +452,11 @@ type SpanAggregatesDBResponseItem struct {
GroupBy string `ch:"groupBy"`
}
type SetTTLResponseItem struct {
Message string `json:"message"`
}
type DiskItem struct {
Name string `json:"name,omitempty" ch:"name"`
Type string `json:"type,omitempty" ch:"type"`
}
type DBResponseTTL struct {
EngineFull string `ch:"engine_full"`
}
type GetTTLResponseItem struct {
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
Status string `json:"status"`
}
type DBResponseServiceName struct {
ServiceName string `ch:"serviceName"`
Count uint64 `ch:"count"`

View File

@@ -23,6 +23,7 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
@@ -135,6 +136,9 @@ type Config struct {
// Auditor config
Auditor auditor.Config `mapstructure:"auditor"`
// MeterReporter config
MeterReporter meterreporter.Config `mapstructure:"meterreporter"`
// CloudIntegration config
CloudIntegration cloudintegration.Config `mapstructure:"cloudintegration"`
@@ -175,6 +179,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
identn.NewConfigFactory(),
serviceaccount.NewConfigFactory(),
auditor.NewConfigFactory(),
meterreporter.NewConfigFactory(),
cloudintegration.NewConfigFactory(),
tracedetail.NewConfigFactory(),
authz.NewConfigFactory(),

View File

@@ -28,6 +28,8 @@ import (
"github.com/SigNoz/signoz/pkg/identn/apikeyidentn"
"github.com/SigNoz/signoz/pkg/identn/impersonationidentn"
"github.com/SigNoz/signoz/pkg/identn/tokenizeridentn"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/meterreporter/noopmeterreporter"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
@@ -318,6 +320,12 @@ func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[audi
)
}
func NewMeterReporterProviderFactories() factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
return factory.MustNewNamedMap(
noopmeterreporter.NewFactory(),
)
}
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
return factory.MustNewNamedMap(
configflagger.NewFactory(registry),

View File

@@ -22,6 +22,7 @@ import (
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -84,6 +85,7 @@ type SigNoz struct {
Flagger flagger.Flagger
Gateway gateway.Gateway
Auditor auditor.Auditor
MeterReporter meterreporter.Reporter
}
func New(
@@ -104,6 +106,7 @@ func New(
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
meterReporterProviderFactories func(context.Context, flagger.Flagger, licensing.Licensing, telemetrystore.TelemetryStore, sqlstore.SQLStore, organization.Getter, zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string),
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
cloudIntegrationCallback func(sqlstore.SQLStore, global.Global, zeus.Zeus, gateway.Gateway, licensing.Licensing, serviceaccount.Module, cloudintegration.Config) (cloudintegration.Module, error),
rulerProviderFactories func(cache.Cache, alertmanager.Alertmanager, sqlstore.SQLStore, telemetrystore.TelemetryStore, telemetrytypes.MetadataStore, prometheus.Prometheus, organization.Getter, rulestatehistory.Module, querier.Querier, queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]],
@@ -386,6 +389,13 @@ func New(
return nil, err
}
// Initialize meter reporter from the variant-specific provider factories
meterReporterFactories, meterReporterProvider := meterReporterProviderFactories(ctx, flagger, licensing, telemetrystore, sqlstore, orgGetter, zeus)
meterReporter, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.MeterReporter, meterReporterFactories, meterReporterProvider)
if err != nil {
return nil, err
}
// Initialize authns
store := sqlauthnstore.NewStore(sqlstore)
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
@@ -501,6 +511,7 @@ func New(
factory.NewNamedService(factory.MustNewName("authz"), authz),
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
factory.NewNamedService(factory.MustNewName("meterreporter"), meterReporter, factory.MustNewName("licensing")),
factory.NewNamedService(factory.MustNewName("ruler"), rulerInstance),
)
if err != nil {
@@ -550,5 +561,6 @@ func New(
Flagger: flagger,
Gateway: gateway,
Auditor: auditor,
MeterReporter: meterReporter,
}, nil
}

View File

@@ -0,0 +1,13 @@
package metercollectortypes
import "github.com/SigNoz/signoz/pkg/valuer"
// Aggregation is a supported Zeus aggregation name.
type Aggregation struct {
valuer.String
}
var (
AggregationSum = Aggregation{valuer.NewString("sum")}
AggregationMax = Aggregation{valuer.NewString("max")}
)

View File

@@ -0,0 +1,40 @@
// Package metercollectortypes holds billing meter value types.
package metercollectortypes
import (
"regexp"
"github.com/SigNoz/signoz/pkg/errors"
)
var nameRegex = regexp.MustCompile(`^[a-z][a-z0-9_.]+$`)
// Name is a validated dotted meter name.
type Name struct {
s string
}
func NewName(s string) (Name, error) {
if !nameRegex.MatchString(s) {
return Name{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid meter name: %s", s)
}
return Name{s: s}, nil
}
func MustNewName(s string) Name {
name, err := NewName(s)
if err != nil {
panic(err)
}
return name
}
func (n Name) String() string {
return n.s
}
func (n Name) IsZero() bool {
return n.s == ""
}

View File

@@ -0,0 +1,13 @@
package metercollectortypes
import "github.com/SigNoz/signoz/pkg/valuer"
// Unit is a supported Zeus meter unit.
type Unit struct {
valuer.String
}
var (
UnitCount = Unit{valuer.NewString("count")}
UnitBytes = Unit{valuer.NewString("bytes")}
)

View File

@@ -0,0 +1,57 @@
package meterreportertypes
import "github.com/SigNoz/signoz/pkg/types/metercollectortypes"
// Meter is one meter value sent to Zeus.
type Meter struct {
// MeterName is the fully-qualified meter identifier.
MeterName string `json:"name"`
// Value is the aggregated scalar for this meter over the reporting window.
Value float64 `json:"value"`
// Unit is the metric unit for this meter.
Unit metercollectortypes.Unit `json:"unit"`
// Aggregation names the aggregation applied to produce Value.
Aggregation metercollectortypes.Aggregation `json:"aggregation"`
// StartUnixMilli is the inclusive window start in epoch milliseconds.
StartUnixMilli int64 `json:"start_unix_milli"`
// EndUnixMilli is the exclusive window end in epoch milliseconds.
EndUnixMilli int64 `json:"end_unix_milli"`
// IsCompleted is false for the current day's partial value.
IsCompleted bool `json:"is_completed"`
// Dimensions is the per-meter label set.
Dimensions map[string]string `json:"dimensions"`
}
// NewMeter builds a meter from typed metadata and a reporting window.
func NewMeter(
name metercollectortypes.Name,
value float64,
unit metercollectortypes.Unit,
aggregation metercollectortypes.Aggregation,
window Window,
dimensions map[string]string,
) Meter {
return Meter{
MeterName: name.String(),
Value: value,
Unit: unit,
Aggregation: aggregation,
StartUnixMilli: window.StartUnixMilli,
EndUnixMilli: window.EndUnixMilli,
IsCompleted: window.IsCompleted,
Dimensions: dimensions,
}
}
// PostableMeters is one day of meters for Zeus.PutMetersV3.
type PostableMeters struct {
// Meters is the set of meter values being shipped for one day.
Meters []Meter `json:"meters"`
}

View File

@@ -0,0 +1,21 @@
package meterreportertypes
import "time"
// Window is the [Start, End) range a reporter tick collects.
type Window struct {
StartUnixMilli int64
EndUnixMilli int64
IsCompleted bool
}
// Day returns the UTC day containing the window start.
func (w Window) Day() time.Time {
t := time.UnixMilli(w.StartUnixMilli).UTC()
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
}
// IsValid rejects empty, zero, and inverted windows.
func (w Window) IsValid() bool {
return w.StartUnixMilli > 0 && w.EndUnixMilli > w.StartUnixMilli
}

View File

@@ -73,19 +73,6 @@ func NewTraitsFromOrganization(org *Organization) map[string]any {
}
}
type TTLSetting struct {
bun.BaseModel `bun:"table:ttl_setting"`
Identifiable
TimeAuditable
TransactionID string `bun:"transaction_id,type:text,notnull"`
TableName string `bun:"table_name,type:text,notnull"`
TTL int `bun:"ttl,notnull,default:0"`
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
Status string `bun:"status,type:text,notnull"`
OrgID string `json:"-" bun:"org_id,notnull"`
Condition string `bun:"condition,type:text"`
}
type OrganizationStore interface {
Create(context.Context, *Organization) error
Get(context.Context, valuer.UUID) (*Organization, error)

View File

@@ -0,0 +1,132 @@
package retentiontypes
import (
"time"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
const (
DefaultLogsRetentionDays = 15
DefaultMetricsRetentionDays = 30
DefaultTracesRetentionDays = 15
)
const (
TraceTTL = "traces"
MetricsTTL = "metrics"
LogsTTL = "logs"
)
const (
TTLSettingStatusPending = "pending"
TTLSettingStatusFailed = "failed"
TTLSettingStatusSuccess = "success"
)
// Slice is a half-open time range using one TTL recipe.
type Slice struct {
StartMs int64
EndMs int64
Rules []CustomRetentionRule
DefaultDays int
}
type TTLSetting struct {
bun.BaseModel `bun:"table:ttl_setting"`
ID valuer.UUID `json:"id" bun:"id,pk,type:text" required:"true"`
CreatedAt time.Time `bun:"created_at" json:"createdAt"`
UpdatedAt time.Time `bun:"updated_at" json:"updatedAt"`
TransactionID string `bun:"transaction_id,type:text,notnull"`
TableName string `bun:"table_name,type:text,notnull"`
TTL int `bun:"ttl,notnull,default:0"`
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
Status string `bun:"status,type:text,notnull"`
OrgID string `json:"-" bun:"org_id,notnull"`
Condition string `bun:"condition,type:text"`
}
type TTLParams struct {
Type string
ColdStorageVolume string
ToColdStorageDuration int64
DelDuration int64
}
type CustomRetentionTTLParams struct {
Type string `json:"type"`
DefaultTTLDays int `json:"defaultTTLDays"`
TTLConditions []CustomRetentionRule `json:"ttlConditions"`
ColdStorageVolume string `json:"coldStorageVolume,omitempty"`
ToColdStorageDurationDays int64 `json:"coldStorageDurationDays,omitempty"`
}
type GetTTLParams struct {
Type string
}
type GetCustomRetentionTTLResponse struct {
Version string `json:"version"`
Status string `json:"status"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
DefaultTTLDays int `json:"default_ttl_days,omitempty"`
TTLConditions []CustomRetentionRule `json:"ttl_conditions,omitempty"`
ColdStorageVolume string `json:"cold_storage_volume,omitempty"`
ColdStorageTTLDays int `json:"cold_storage_ttl_days,omitempty"`
}
type CustomRetentionTTLResponse struct {
Message string `json:"message"`
}
type TTLStatusItem struct {
Id int `json:"id" db:"id"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
TableName string `json:"table_name" db:"table_name"`
TTL int `json:"ttl" db:"ttl"`
Status string `json:"status" db:"status"`
ColdStorageTtl int `json:"cold_storage_ttl" db:"cold_storage_ttl"`
}
type SetTTLResponseItem struct {
Message string `json:"message"`
}
type DBResponseTTL struct {
EngineFull string `ch:"engine_full"`
}
type GetTTLResponseItem struct {
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
Status string `json:"status"`
}
// CustomRetentionRule is one custom retention rule as stored in ttl_setting.condition.
// Rules are evaluated in declaration order; the first matching rule wins.
type CustomRetentionRule struct {
Filters []FilterCondition `json:"conditions"`
TTLDays int `json:"ttlDays"`
}
// FilterCondition is one label-key, allowed-values condition inside a retention rule.
type FilterCondition struct {
Key string `json:"key"`
Values []string `json:"values"`
}

View File

@@ -3,6 +3,7 @@ package zeustypes
import (
"encoding/json"
"net/url"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/tidwall/gjson"
@@ -37,6 +38,11 @@ type Host struct {
URL string `json:"url" required:"true"`
}
type MeterCheckpoint struct {
Name string
Checkpoint time.Time
}
func NewGettableHost(data []byte) *GettableHost {
parsed := gjson.ParseBytes(data)
dns := parsed.Get("cluster.region.dns").String()

View File

@@ -49,6 +49,14 @@ func (provider *provider) PutMetersV2(_ context.Context, _ string, _ []byte) err
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v2 is not supported")
}
func (provider *provider) PutMetersV3(_ context.Context, _ string, _ string, _ []byte) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v3 is not supported")
}
func (provider *provider) GetMeterCheckpoints(_ context.Context, _ string) ([]zeustypes.MeterCheckpoint, error) {
return nil, errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "fetching meter checkpoints is not supported")
}
func (provider *provider) PutProfile(_ context.Context, _ string, _ *zeustypes.PostableProfile) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
}

View File

@@ -35,6 +35,16 @@ type Zeus interface {
// Puts the meters for the given license key using Zeus.
PutMetersV2(context.Context, string, []byte) error
// PutMetersV3 ships one day's batch of meter readings to the v2/meters
// endpoint. idempotencyKey is propagated as X-Idempotency-Key so Zeus can
// UPSERT on retries. The batch is accepted or rejected as a whole.
PutMetersV3(ctx context.Context, licenseKey string, idempotencyKey string, body []byte) error
// GetMeterCheckpoints returns the latest sealed (is_completed=true) UTC day
// Zeus has stored for each billing meter name. Missing meter names are
// treated by the cron as bootstrap cases.
GetMeterCheckpoints(ctx context.Context, licenseKey string) ([]zeustypes.MeterCheckpoint, error)
// Put profile for the given license key.
PutProfile(context.Context, string, *zeustypes.PostableProfile) error