Compare commits

...

4 Commits

Author SHA1 Message Date
Karan Balani
5dec8e26ad chore: update interval validation to allow min 5 mins interval for testing 2026-04-21 21:46:33 +05:30
Karan Balani
e102cc07ce feat(meterreporter): add traces meters 2026-04-21 21:28:13 +05:30
Karan Balani
b898fb6e3b feat(meterreporter): simplify code, add metric meters, dry-run zeus call 2026-04-21 21:22:40 +05:30
Karan Balani
bc7dbab2b2 feat: meter reporter for new billing infra 2026-04-20 18:42:23 +05:30
26 changed files with 1221 additions and 1 deletions

View File

@@ -23,6 +23,7 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -109,6 +110,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
return signoz.NewAuditorProviderFactories()
},
func(_ licensing.Licensing, _ telemetrystore.TelemetryStore, _ sqlstore.SQLStore, _ organization.Getter, _ zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
return signoz.NewMeterReporterProviderFactories()
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
return querier.NewHandler(ps, q, a)
},

View File

@@ -17,6 +17,7 @@ import (
"github.com/SigNoz/signoz/ee/gateway/httpgateway"
enterpriselicensing "github.com/SigNoz/signoz/ee/licensing"
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
"github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter"
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration/implcloudprovider"
"github.com/SigNoz/signoz/ee/modules/dashboard/impldashboard"
@@ -38,6 +39,7 @@ import (
"github.com/SigNoz/signoz/pkg/gateway"
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
pkgcloudintegration "github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -157,6 +159,13 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
}
return factories
},
func(licensing licensing.Licensing, telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore, orgGetter organization.Getter, zeus zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
factories := signoz.NewMeterReporterProviderFactories()
if err := factories.Add(signozmeterreporter.NewFactory(licensing, telemetryStore, sqlStore, orgGetter, zeus)); err != nil {
panic(err)
}
return factories
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
communityHandler := querier.NewHandler(ps, q, a)
return eequerier.NewHandler(ps, q, communityHandler)

View File

@@ -407,3 +407,23 @@ cloudintegration:
agent:
# The version of the cloud integration agent.
version: v0.0.8
##################### Meter Reporter #####################
meterreporter:
# Specifies the meter reporter provider to use.
# noop: does not report any meters (community default).
# signoz: periodically queries meters via the querier and ships readings to Zeus (enterprise).
provider: noop
# The interval between collection ticks. Minimum 30m.
interval: 6h
# The per-tick timeout that bounds collect-and-ship work.
timeout: 30s
retry:
# Whether to retry on transient failures.
enabled: true
# The initial wait time before the first retry.
initial_interval: 5s
# The upper bound on backoff interval.
max_interval: 30s
# The total maximum time spent retrying.
max_elapsed_time: 1m

View File

@@ -0,0 +1,125 @@
package signozmeterreporter
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// tick collects one round of readings for the instance's org and ships them to
// zeus under its active license. Per-collector errors are logged and counted
// but do not abort the tick.
func (provider *Provider) tick(ctx context.Context) error {
now := time.Now().UTC()
// Go to 00:00 UTC of current day (in milliseconds)
bucketStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
// Period in which meter data will be queried: 00:00 UTC → now UTC
window := meterreporter.Window{
StartMs: bucketStart.UnixMilli(),
EndMs: now.UnixMilli(),
}
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "failed to list organizations")
}
if len(orgs) == 0 {
return nil
}
if len(orgs) > 1 {
// Billing is scoped to a single license per instance; the meter data in signoz_meter has no org marker,
// so we can't split a multi-org instance correctly. Report against the first org and warn.
provider.settings.Logger().WarnContext(ctx, "multiple orgs on a single instance; reporting only the first", slog.Int("org_count", len(orgs)))
}
org := orgs[0]
license, err := provider.licensing.GetActive(ctx, org.ID)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "failed to fetch active license for org %q", org.ID.StringValue())
}
if license == nil || license.Key == "" {
provider.settings.Logger().WarnContext(ctx, "skipping tick, nil/empty license for org", slog.String("org_id", org.ID.StringValue()))
return nil
}
readings := provider.collectOrgReadings(ctx, org.ID, window)
if len(readings) == 0 {
return nil
}
date := bucketStart.Format("2006-01-02")
if err := provider.shipReadings(ctx, license.Key, date, readings); err != nil {
provider.metrics.postErrors.Add(ctx, 1)
provider.settings.Logger().ErrorContext(ctx, "failed to ship meter readings", errors.Attr(err), slog.Int("readings", len(readings)))
return nil
}
provider.metrics.readingsEmitted.Add(ctx, int64(len(readings)))
return nil
}
// collectOrgReadings runs every registered Meter's Collector against orgID and
// returns their combined Readings. Individual meter failures are logged and
// skipped — one bad meter does not block the rest of the batch.
func (provider *Provider) collectOrgReadings(ctx context.Context, orgID valuer.UUID, window meterreporter.Window) []meterreportertypes.Reading {
readings := make([]meterreportertypes.Reading, 0, len(provider.meters))
for _, meter := range provider.meters {
collectedReadings, err := meter.Collect(ctx, provider.deps, meter, orgID, window)
if err != nil {
provider.metrics.collectErrors.Add(ctx, 1)
provider.settings.Logger().WarnContext(ctx, "meter collection failed", errors.Attr(err), slog.String("meter", meter.Name.String()), slog.String("org_id", orgID.StringValue()))
continue
}
readings = append(readings, collectedReadings...)
}
return readings
}
// shipReadings encodes the batch as PostableMeterReadings JSON and, in the
// fully wired flow, POSTs it to Zeus under a date-scoped idempotency key so
// subsequent ticks within the same UTC day UPSERT.
//
// ! TEMPORARY: the Zeus PutMeterReadings endpoint is not live yet. Until it
// lands, we log the payload at INFO so staging can verify collection end-to-end
// without a server counterpart. Restore the Zeus call (and drop the log) once
// the API ships.
func (provider *Provider) shipReadings(ctx context.Context, licenseKey string, date string, readings []meterreportertypes.Reading) error {
idempotencyKey := fmt.Sprintf("meter-cron:%s", date)
// ! TODO: this needs to be fixed in the format we make the zeus API
payload := meterreportertypes.PostableMeterReadings{
IdempotencyKey: idempotencyKey,
Readings: readings,
}
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "marshal meter readings")
}
// ! TEMPORARY: skip the Zeus call until the API is available. Logging the
// serialized payload instead so we can eyeball readings in staging logs.
// When Zeus is ready, replace the log below with:
// if err := provider.zeus.PutMeterReadings(ctx, licenseKey, idempotencyKey, body); err != nil { ... }
provider.settings.Logger().InfoContext(ctx, "meter readings (Zeus API not yet live — dry-run log)",
slog.String("license_key", licenseKey),
slog.String("idempotency_key", idempotencyKey),
slog.Int("readings", len(readings)),
slog.String("payload", string(body)),
)
_ = provider.zeus // keep the field referenced so the dep wiring does not bitrot
return nil
}

View File

@@ -0,0 +1,154 @@
package signozmeterreporter
import (
"context"
"log/slog"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/zeus"
)
var _ factory.ServiceWithHealthy = (*Provider)(nil)
// Provider is the enterprise meter reporter. It ticks on a fixed interval,
// invokes every registered Collector against every licensed org, and ships
// the resulting readings to Zeus.
type Provider struct {
settings factory.ScopedProviderSettings
config meterreporter.Config
meters []meterreporter.Meter
deps meterreporter.CollectorDeps
licensing licensing.Licensing
orgGetter organization.Getter
zeus zeus.Zeus
healthyC chan struct{}
stopC chan struct{}
goroutinesWg sync.WaitGroup
metrics *reporterMetrics
}
// NewFactory returns a ProviderFactory for the signoz meter reporter.
func NewFactory(
licensing licensing.Licensing,
telemetryStore telemetrystore.TelemetryStore,
sqlstore sqlstore.SQLStore,
orgGetter organization.Getter,
zeus zeus.Zeus,
) factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
return factory.NewProviderFactory(
factory.MustNewName("signoz"),
func(ctx context.Context, providerSettings factory.ProviderSettings, config meterreporter.Config) (meterreporter.Reporter, error) {
return newProvider(ctx, providerSettings, config, licensing, telemetryStore, sqlstore, orgGetter, zeus)
},
)
}
func newProvider(
_ context.Context,
providerSettings factory.ProviderSettings,
config meterreporter.Config,
licensing licensing.Licensing,
telemetryStore telemetrystore.TelemetryStore,
sqlstore sqlstore.SQLStore,
orgGetter organization.Getter,
zeus zeus.Zeus,
) (*Provider, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter")
metrics, err := newReporterMetrics(settings.Meter())
if err != nil {
return nil, err
}
meters, err := meterreporter.DefaultMeters()
if err != nil {
return nil, err
}
return &Provider{
settings: settings,
config: config,
meters: meters,
deps: meterreporter.CollectorDeps{
TelemetryStore: telemetryStore,
SQLStore: sqlstore,
},
licensing: licensing,
orgGetter: orgGetter,
zeus: zeus,
healthyC: make(chan struct{}),
stopC: make(chan struct{}),
metrics: metrics,
}, nil
}
// Start runs an initial tick and then loops on the configured interval until
// Stop is called. Start blocks until the goroutine returns, matching the
// factory.Service contract used across the codebase.
func (provider *Provider) Start(ctx context.Context) error {
close(provider.healthyC)
provider.goroutinesWg.Add(1)
go func() {
defer provider.goroutinesWg.Done()
provider.runTick(ctx)
ticker := time.NewTicker(provider.config.Interval)
defer ticker.Stop()
for {
select {
case <-provider.stopC:
return
case <-ticker.C:
provider.runTick(ctx)
}
}
}()
provider.goroutinesWg.Wait()
return nil
}
// Stop requests the reporter to stop, waits for the in-flight tick (bounded by
// Config.Timeout) to complete, and returns.
func (provider *Provider) Stop(_ context.Context) error {
<-provider.healthyC
select {
case <-provider.stopC:
// already closed
default:
close(provider.stopC)
}
provider.goroutinesWg.Wait()
return nil
}
func (provider *Provider) Healthy() <-chan struct{} {
return provider.healthyC
}
// runTick executes one collect-and-ship cycle under Config.Timeout. Errors are
// logged and counted; they do not propagate because the reporter must keep
// ticking on subsequent intervals.
func (provider *Provider) runTick(parentCtx context.Context) {
provider.metrics.ticks.Add(parentCtx, 1)
ctx, cancel := context.WithTimeout(parentCtx, provider.config.Timeout)
defer cancel()
if err := provider.tick(ctx); err != nil {
provider.settings.Logger().ErrorContext(ctx, "meter reporter tick failed", errors.Attr(err), slog.Duration("timeout", provider.config.Timeout))
}
}

View File

@@ -0,0 +1,48 @@
package signozmeterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"go.opentelemetry.io/otel/metric"
)
type reporterMetrics struct {
ticks metric.Int64Counter
readingsEmitted metric.Int64Counter
collectErrors metric.Int64Counter
postErrors metric.Int64Counter
}
func newReporterMetrics(meter metric.Meter) (*reporterMetrics, error) {
var errs error
ticks, err := meter.Int64Counter("signoz.meterreporter.ticks", metric.WithDescription("Total number of meter reporter ticks that ran to completion or aborted."))
if err != nil {
errs = errors.Join(errs, err)
}
readingsEmitted, err := meter.Int64Counter("signoz.meterreporter.readings.emitted", metric.WithDescription("Total number of meter readings shipped to Zeus."))
if err != nil {
errs = errors.Join(errs, err)
}
collectErrors, err := meter.Int64Counter("signoz.meterreporter.collect.errors", metric.WithDescription("Total number of collect errors across organizations and meters."))
if err != nil {
errs = errors.Join(errs, err)
}
postErrors, err := meter.Int64Counter("signoz.meterreporter.post.errors", metric.WithDescription("Total number of Zeus POST failures."))
if err != nil {
errs = errors.Join(errs, err)
}
if errs != nil {
return nil, errs
}
return &reporterMetrics{
ticks: ticks,
readingsEmitted: readingsEmitted,
collectErrors: collectErrors,
postErrors: postErrors,
}, nil
}

View File

@@ -148,6 +148,24 @@ func (provider *Provider) PutMetersV2(ctx context.Context, key string, data []by
return err
}
func (provider *Provider) PutMeterReadings(ctx context.Context, key string, idempotencyKey string, data []byte) error {
headers := http.Header{}
if idempotencyKey != "" {
headers.Set("X-Idempotency-Key", idempotencyKey)
}
_, err := provider.doWithHeaders(
ctx,
provider.config.URL.JoinPath("/v2/meters"),
http.MethodPost,
key,
data,
headers,
)
return err
}
func (provider *Provider) PutProfile(ctx context.Context, key string, profile *zeustypes.PostableProfile) error {
body, err := json.Marshal(profile)
if err != nil {
@@ -183,12 +201,21 @@ func (provider *Provider) PutHost(ctx context.Context, key string, host *zeustyp
}
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
return provider.doWithHeaders(ctx, url, method, key, requestBody, nil)
}
func (provider *Provider) doWithHeaders(ctx context.Context, url *url.URL, method string, key string, requestBody []byte, extraHeaders http.Header) ([]byte, error) {
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
request.Header.Set("Content-Type", "application/json")
for k, vs := range extraHeaders {
for _, v := range vs {
request.Header.Add(k, v)
}
}
response, err := provider.httpClient.Do(request)
if err != nil {

View File

@@ -0,0 +1,28 @@
package meterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Window is the reporting window a Collector produces readings for. Timestamps
// are unix milliseconds; StartMs is used as the emitted reading's Timestamp
// and typically aligns to UTC day start.
type Window struct {
StartMs int64
EndMs int64
}
// CollectorDeps contains the dependencies a meter collector may need to
// resolve readings. Individual collectors can choose the subset they use.
type CollectorDeps struct {
TelemetryStore telemetrystore.TelemetryStore
SQLStore sqlstore.SQLStore
}
// CollectorFunc resolves readings for a single Meter.
type CollectorFunc func(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error)

View File

@@ -0,0 +1,103 @@
package meterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
)
// CollectLogCountMeter emits a single Reading for signoz.meter.log.count.
// Each log-meter collector owns its own query end-to-end — duplication is
// preferred over shared helpers because these paths are billing-critical.
func CollectLogCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
if deps.TelemetryStore == nil {
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ifNull(sum(value), 0) AS value")
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", MeterLogCount.String()),
sb.GTE("unix_milli", window.StartMs),
sb.LT("unix_milli", window.EndMs),
)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var value float64
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterLogCount.String())
}
dimensions := map[string]string{
DimensionAggregation: meter.Aggregation,
DimensionUnit: meter.Unit,
DimensionOrganizationID: orgID.StringValue(),
}
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainLogs)
if err != nil {
return nil, err
}
if ok {
dimensions[DimensionRetentionDays] = retentionDays
}
return []meterreportertypes.Reading{{
MeterName: MeterLogCount.String(),
Value: value,
Timestamp: window.StartMs,
IsCompleted: false,
Dimensions: dimensions,
}}, nil
}
// CollectLogSizeMeter emits a single Reading for signoz.meter.log.size.
// Each log-meter collector owns its own query end-to-end — duplication is
// preferred over shared helpers because these paths are billing-critical.
func CollectLogSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
if deps.TelemetryStore == nil {
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ifNull(sum(value), 0) AS value")
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", MeterLogSize.String()),
sb.GTE("unix_milli", window.StartMs),
sb.LT("unix_milli", window.EndMs),
)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var value float64
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterLogSize.String())
}
dimensions := map[string]string{
DimensionAggregation: meter.Aggregation,
DimensionUnit: meter.Unit,
DimensionOrganizationID: orgID.StringValue(),
}
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainLogs)
if err != nil {
return nil, err
}
if ok {
dimensions[DimensionRetentionDays] = retentionDays
}
return []meterreportertypes.Reading{{
MeterName: MeterLogSize.String(),
Value: value,
Timestamp: window.StartMs,
IsCompleted: false,
Dimensions: dimensions,
}}, nil
}

View File

@@ -0,0 +1,105 @@
package meterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
)
// CollectMetricDatapointCountMeter emits a single Reading for
// signoz.meter.metric.datapoint.count. Each metric-meter collector owns its
// own query end-to-end — duplication is preferred over shared helpers because
// these paths are billing-critical.
func CollectMetricDatapointCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
if deps.TelemetryStore == nil {
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ifNull(sum(value), 0) AS value")
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", MeterMetricDatapointCount.String()),
sb.GTE("unix_milli", window.StartMs),
sb.LT("unix_milli", window.EndMs),
)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var value float64
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterMetricDatapointCount.String())
}
dimensions := map[string]string{
DimensionAggregation: meter.Aggregation,
DimensionUnit: meter.Unit,
DimensionOrganizationID: orgID.StringValue(),
}
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainMetrics)
if err != nil {
return nil, err
}
if ok {
dimensions[DimensionRetentionDays] = retentionDays
}
return []meterreportertypes.Reading{{
MeterName: MeterMetricDatapointCount.String(),
Value: value,
Timestamp: window.StartMs,
IsCompleted: false,
Dimensions: dimensions,
}}, nil
}
// CollectMetricDatapointSizeMeter emits a single Reading for
// signoz.meter.metric.datapoint.size. Each metric-meter collector owns its
// own query end-to-end — duplication is preferred over shared helpers because
// these paths are billing-critical.
func CollectMetricDatapointSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
if deps.TelemetryStore == nil {
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ifNull(sum(value), 0) AS value")
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", MeterMetricDatapointSize.String()),
sb.GTE("unix_milli", window.StartMs),
sb.LT("unix_milli", window.EndMs),
)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var value float64
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterMetricDatapointSize.String())
}
dimensions := map[string]string{
DimensionAggregation: meter.Aggregation,
DimensionUnit: meter.Unit,
DimensionOrganizationID: orgID.StringValue(),
}
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainMetrics)
if err != nil {
return nil, err
}
if ok {
dimensions[DimensionRetentionDays] = retentionDays
}
return []meterreportertypes.Reading{{
MeterName: MeterMetricDatapointSize.String(),
Value: value,
Timestamp: window.StartMs,
IsCompleted: false,
Dimensions: dimensions,
}}, nil
}

View File

@@ -0,0 +1,103 @@
package meterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
)
// CollectSpanCountMeter emits a single Reading for signoz.meter.span.count.
// Each trace-meter collector owns its own query end-to-end — duplication is
// preferred over shared helpers because these paths are billing-critical.
func CollectSpanCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
if deps.TelemetryStore == nil {
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ifNull(sum(value), 0) AS value")
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", MeterSpanCount.String()),
sb.GTE("unix_milli", window.StartMs),
sb.LT("unix_milli", window.EndMs),
)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var value float64
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterSpanCount.String())
}
dimensions := map[string]string{
DimensionAggregation: meter.Aggregation,
DimensionUnit: meter.Unit,
DimensionOrganizationID: orgID.StringValue(),
}
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainTraces)
if err != nil {
return nil, err
}
if ok {
dimensions[DimensionRetentionDays] = retentionDays
}
return []meterreportertypes.Reading{{
MeterName: MeterSpanCount.String(),
Value: value,
Timestamp: window.StartMs,
IsCompleted: false,
Dimensions: dimensions,
}}, nil
}
// CollectSpanSizeMeter emits a single Reading for signoz.meter.span.size.
// Each trace-meter collector owns its own query end-to-end — duplication is
// preferred over shared helpers because these paths are billing-critical.
func CollectSpanSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
if deps.TelemetryStore == nil {
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ifNull(sum(value), 0) AS value")
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
sb.Where(
sb.Equal("metric_name", MeterSpanSize.String()),
sb.GTE("unix_milli", window.StartMs),
sb.LT("unix_milli", window.EndMs),
)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var value float64
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterSpanSize.String())
}
dimensions := map[string]string{
DimensionAggregation: meter.Aggregation,
DimensionUnit: meter.Unit,
DimensionOrganizationID: orgID.StringValue(),
}
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainTraces)
if err != nil {
return nil, err
}
if ok {
dimensions[DimensionRetentionDays] = retentionDays
}
return []meterreportertypes.Reading{{
MeterName: MeterSpanSize.String(),
Value: value,
Timestamp: window.StartMs,
IsCompleted: false,
Dimensions: dimensions,
}}, nil
}

View File

@@ -0,0 +1,65 @@
package meterreporter
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
var _ factory.Config = (*Config)(nil)
type Config struct {
// Provider selects the reporter implementation (default "noop").
Provider string `mapstructure:"provider"`
// Interval is how often the reporter collects and ships meter readings.
Interval time.Duration `mapstructure:"interval"`
// Timeout bounds a single collect-and-ship cycle.
Timeout time.Duration `mapstructure:"timeout"`
// Retry configures exponential backoff for transient Zeus failures.
Retry RetryConfig `mapstructure:"retry"`
}
type RetryConfig struct {
Enabled bool `mapstructure:"enabled"`
InitialInterval time.Duration `mapstructure:"initial_interval"`
MaxInterval time.Duration `mapstructure:"max_interval"`
MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"`
}
func newConfig() factory.Config {
return Config{
Provider: "noop",
Interval: 6 * time.Hour,
Timeout: 30 * time.Second,
Retry: RetryConfig{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
},
}
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("meterreporter"), newConfig)
}
func (c Config) Validate() error {
if c.Interval < 5*time.Minute {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be at least 5m")
}
if c.Timeout <= 0 {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be greater than 0")
}
if c.Timeout >= c.Interval {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be less than meterreporter::interval")
}
return nil
}

View File

@@ -0,0 +1,23 @@
package meterreporter
import (
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
)
// Meter is one registered meter - a name, billing metadata, and the function that knows how to produce readings for it.
//
// The same metric Name may appear multiple times in the registry as long as each entry
// uses a different Aggregation (for example min/max/p99 of the same source meter).
type Meter struct {
// Name is the meter's identifier.
Name meterreportertypes.Name
// Unit is available to the collector for the signoz.billing.unit dimension.
Unit string
// Aggregation is available to the collector for the signoz.billing.aggregation dimension.
Aggregation string
// Collect knows how to turn this Meter into zero or more Readings per tick.
Collect CollectorFunc
}

View File

@@ -0,0 +1,26 @@
package meterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
var (
ErrCodeInvalidInput = errors.MustNewCode("meterreporter_invalid_input")
ErrCodeReportFailed = errors.MustNewCode("meterreporter_report_failed")
)
// Dimension keys automatically attached to every Reading.
const (
DimensionAggregation = "signoz.billing.aggregation"
DimensionUnit = "signoz.billing.unit"
DimensionOrganizationID = "signoz.billing.organization.id"
DimensionRetentionDays = "signoz.billing.retention.days"
)
// Reporter periodically collects meter values via the query service and ships
// them to Zeus. Implementations must satisfy factory.ServiceWithHealthy so the
// signoz registry can wait on startup and request graceful shutdown.
type Reporter interface {
factory.ServiceWithHealthy
}

View File

@@ -0,0 +1,39 @@
package noopmeterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/meterreporter"
)
type provider struct {
healthyC chan struct{}
stopC chan struct{}
}
func NewFactory() factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
}
func New(_ context.Context, _ factory.ProviderSettings, _ meterreporter.Config) (meterreporter.Reporter, error) {
return &provider{
healthyC: make(chan struct{}),
stopC: make(chan struct{}),
}, nil
}
func (p *provider) Start(_ context.Context) error {
close(p.healthyC)
<-p.stopC
return nil
}
func (p *provider) Stop(_ context.Context) error {
close(p.stopC)
return nil
}
func (p *provider) Healthy() <-chan struct{} {
return p.healthyC
}

View File

@@ -0,0 +1,123 @@
package meterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
)
// Exported names for every meter the reporter knows about. Refer to these
// symbols (not string literals) everywhere - typos turn into compile errors
// instead of silently producing a new meter row at Zeus.
var (
MeterLogCount = meterreportertypes.MustNewName("signoz.meter.log.count")
MeterLogSize = meterreportertypes.MustNewName("signoz.meter.log.size")
MeterMetricDatapointCount = meterreportertypes.MustNewName("signoz.meter.metric.datapoint.count")
MeterMetricDatapointSize = meterreportertypes.MustNewName("signoz.meter.metric.datapoint.size")
MeterSpanCount = meterreportertypes.MustNewName("signoz.meter.span.count")
MeterSpanSize = meterreportertypes.MustNewName("signoz.meter.span.size")
)
const AggregationSum = "sum"
func baseMeters() []*Meter {
meters := []*Meter{
{
Name: MeterLogCount,
Unit: "count",
Aggregation: AggregationSum,
Collect: CollectLogCountMeter,
},
{
Name: MeterLogSize,
Unit: "bytes",
Aggregation: AggregationSum,
Collect: CollectLogSizeMeter,
},
{
Name: MeterMetricDatapointCount,
Unit: "count",
Aggregation: AggregationSum,
Collect: CollectMetricDatapointCountMeter,
},
{
Name: MeterMetricDatapointSize,
Unit: "bytes",
Aggregation: AggregationSum,
Collect: CollectMetricDatapointSizeMeter,
},
{
Name: MeterSpanCount,
Unit: "count",
Aggregation: AggregationSum,
Collect: CollectSpanCountMeter,
},
{
Name: MeterSpanSize,
Unit: "bytes",
Aggregation: AggregationSum,
Collect: CollectSpanSizeMeter,
},
}
mustValidateMeters(meters...)
return meters
}
// DefaultMeters returns the hardcoded query-backed meters supported by the reporter.
func DefaultMeters() ([]Meter, error) {
meters := baseMeters()
if err := validateMeters(meters...); err != nil {
return nil, err
}
resolved := make([]Meter, 0, len(meters))
for _, meter := range meters {
resolved = append(resolved, *meter)
}
return resolved, nil
}
// validateMeters checks that the runtime meter list is internally consistent.
// Every meter must:
// - have a non-zero Name,
// - have a non-empty Unit,
// - have a non-empty Aggregation,
// - have a non-nil Collect function,
// - use a unique (Name, Aggregation) pair.
func validateMeters(meters ...*Meter) error {
seen := make(map[string]struct{}, len(meters))
for _, meter := range meters {
if meter == nil {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "nil meter in registry")
}
if meter.Name.IsZero() {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter with empty name in registry")
}
if meter.Unit == "" {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no unit", meter.Name.String())
}
if meter.Aggregation == "" {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no aggregation", meter.Name.String())
}
if meter.Collect == nil {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no collector function", meter.Name.String())
}
key := meter.Name.String() + "|" + meter.Aggregation
if _, ok := seen[key]; ok {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "duplicate meter %q with aggregation %q", meter.Name.String(), meter.Aggregation)
}
seen[key] = struct{}{}
}
return nil
}
// mustValidateMeters panics when hardcoded meter declarations are invalid.
func mustValidateMeters(meters ...*Meter) {
if err := validateMeters(meters...); err != nil {
panic(err)
}
}

View File

@@ -0,0 +1,107 @@
package meterreporter
import (
"context"
"database/sql"
"strconv"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
type RetentionDomain string
const (
RetentionDomainLogs RetentionDomain = "logs"
RetentionDomainMetrics RetentionDomain = "metrics"
RetentionDomainTraces RetentionDomain = "traces"
)
// defaultRetentionDaysByDomain is the per-domain fallback used when no
// ttl_setting row exists for the org. Values mirror the TTL set by the
// canonical ClickHouse schema for each domain's main table:
//
// - logs: signoz_logs.logs_v2 → 15 days
// - metrics: signoz_metrics.samples_v4 → 2 592 000 s = 30 days
// - traces: signoz_traces.signoz_index_v3 → 1 296 000 s = 15 days
//
// If a migration ever changes the DDL default for a domain, update the
// corresponding entry here so billing readings match reality.
var defaultRetentionDaysByDomain = map[RetentionDomain]int{
RetentionDomainLogs: types.DefaultRetentionDays,
RetentionDomainMetrics: 30,
RetentionDomainTraces: 15,
}
// resolveRetentionDays returns the configured retention for orgID in the given
// domain as a string suitable for the DimensionRetentionDays dimension.
//
// It queries the ttl_setting table using the local (non-distributed) table
// name, which is what the V2 retention writer uses. The TTL column is stored
// in days by the V2 path. When no row exists or the stored TTL is non-positive,
// defaultRetentionDaysByDomain provides the per-domain ClickHouse default so
// the reading always carries an accurate retention dimension.
func resolveRetentionDays(ctx context.Context, sqlstore sqlstore.SQLStore, orgID valuer.UUID, domain RetentionDomain) (string, bool, error) {
if sqlstore == nil {
return "", false, nil
}
tableName, ok := retentionTableName(domain)
if !ok {
return "", false, nil
}
ttl := new(types.TTLSetting)
err := sqlstore.
BunDB().
NewSelect().
Model(ttl).
Where("table_name = ?", tableName).
Where("org_id = ?", orgID.StringValue()).
OrderExpr("created_at DESC").
Limit(1).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return domainFallbackRetention(domain)
}
return "", false, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "load retention for domain %q", domain)
}
if ttl.TTL <= 0 {
return domainFallbackRetention(domain)
}
// TTL is stored in days by the V2 retention path (SetCustomRetentionV2).
return strconv.Itoa(ttl.TTL), true, nil
}
// domainFallbackRetention returns the per-domain default retention used when
// no ttl_setting row exists for an org.
func domainFallbackRetention(domain RetentionDomain) (string, bool, error) {
days, ok := defaultRetentionDaysByDomain[domain]
if !ok {
return "", false, errors.Newf(errors.TypeInternal, ErrCodeReportFailed, "no default retention defined for domain %q", domain)
}
return strconv.Itoa(days), true, nil
}
// retentionTableName returns the local ClickHouse table name used as the key
// in ttl_setting rows for each domain. Must match what SetCustomRetentionV2
// writes (the local, not distributed, table name).
func retentionTableName(domain RetentionDomain) (string, bool) {
switch domain {
case RetentionDomainLogs:
return telemetrylogs.DBName + "." + telemetrylogs.LogsV2LocalTableName, true
case RetentionDomainMetrics:
return telemetrymetrics.DBName + "." + telemetrymetrics.SamplesV4LocalTableName, true
case RetentionDomainTraces:
return telemetrytraces.DBName + "." + telemetrytraces.SpanIndexV3LocalTableName, true
default:
return "", false
}
}

View File

@@ -2008,7 +2008,7 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
if err == sql.ErrNoRows {
// No V2 configuration found, return defaults
response.DefaultTTLDays = 15
response.DefaultTTLDays = types.DefaultRetentionDays
response.TTLConditions = []model.CustomRetentionRule{}
response.Status = constants.StatusSuccess
response.ColdStorageTTLDays = -1

View File

@@ -22,6 +22,7 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
@@ -129,6 +130,9 @@ type Config struct {
// Auditor config
Auditor auditor.Config `mapstructure:"auditor"`
// MeterReporter config
MeterReporter meterreporter.Config `mapstructure:"meterreporter"`
// CloudIntegration config
CloudIntegration cloudintegration.Config `mapstructure:"cloudintegration"`
}
@@ -162,6 +166,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
identn.NewConfigFactory(),
serviceaccount.NewConfigFactory(),
auditor.NewConfigFactory(),
meterreporter.NewConfigFactory(),
cloudintegration.NewConfigFactory(),
}

View File

@@ -28,6 +28,8 @@ import (
"github.com/SigNoz/signoz/pkg/identn/apikeyidentn"
"github.com/SigNoz/signoz/pkg/identn/impersonationidentn"
"github.com/SigNoz/signoz/pkg/identn/tokenizeridentn"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/meterreporter/noopmeterreporter"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
@@ -315,6 +317,12 @@ func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[audi
)
}
func NewMeterReporterProviderFactories() factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
return factory.MustNewNamedMap(
noopmeterreporter.NewFactory(),
)
}
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
return factory.MustNewNamedMap(
configflagger.NewFactory(registry),

View File

@@ -22,6 +22,7 @@ import (
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -84,6 +85,7 @@ type SigNoz struct {
Flagger flagger.Flagger
Gateway gateway.Gateway
Auditor auditor.Auditor
MeterReporter meterreporter.Reporter
}
func New(
@@ -104,6 +106,7 @@ func New(
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
meterReporterProviderFactories func(licensing.Licensing, telemetrystore.TelemetryStore, sqlstore.SQLStore, organization.Getter, zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]],
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
cloudIntegrationCallback func(sqlstore.SQLStore, global.Global, zeus.Zeus, gateway.Gateway, licensing.Licensing, serviceaccount.Module, cloudintegration.Config) (cloudintegration.Module, error),
rulerProviderFactories func(cache.Cache, alertmanager.Alertmanager, sqlstore.SQLStore, telemetrystore.TelemetryStore, telemetrytypes.MetadataStore, prometheus.Prometheus, organization.Getter, rulestatehistory.Module, querier.Querier, queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]],
@@ -377,6 +380,12 @@ func New(
return nil, err
}
// Initialize meter reporter from the variant-specific provider factories
meterReporter, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.MeterReporter, meterReporterProviderFactories(licensing, telemetrystore, sqlstore, orgGetter, zeus), config.MeterReporter.Provider)
if err != nil {
return nil, err
}
// Initialize authns
store := sqlauthnstore.NewStore(sqlstore)
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
@@ -491,6 +500,7 @@ func New(
factory.NewNamedService(factory.MustNewName("authz"), authz),
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
factory.NewNamedService(factory.MustNewName("meterreporter"), meterReporter, factory.MustNewName("licensing")),
factory.NewNamedService(factory.MustNewName("ruler"), rulerInstance),
)
if err != nil {
@@ -540,5 +550,6 @@ func New(
Flagger: flagger,
Gateway: gateway,
Auditor: auditor,
MeterReporter: meterReporter,
}, nil
}

View File

@@ -0,0 +1,41 @@
package meterreportertypes
import (
"regexp"
"github.com/SigNoz/signoz/pkg/errors"
)
var nameRegex = regexp.MustCompile(`^[a-z][a-z0-9_.]+$`)
// Name is a concrete type for a meter name. Dotted namespace identifiers like
// "signoz.meter.log.count" are permitted; arbitrary strings are not, to avoid
// typos silently producing distinct meter rows at Zeus.
type Name struct {
s string
}
func NewName(s string) (Name, error) {
if !nameRegex.MatchString(s) {
return Name{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid meter name: %s", s)
}
return Name{s: s}, nil
}
func MustNewName(s string) Name {
name, err := NewName(s)
if err != nil {
panic(err)
}
return name
}
func (n Name) String() string {
return n.s
}
func (n Name) IsZero() bool {
return n.s == ""
}

View File

@@ -0,0 +1,32 @@
package meterreportertypes
// Reading is a single meter value sent to Zeus. Zeus UPSERTs on
// (license_key, dimension_hash, timestamp), so repeated readings within the
// same tick window safely overwrite prior values.
type Reading struct {
// MeterName is the fully-qualified meter identifier.
MeterName string `json:"meterName"`
// Value is the aggregated scalar for this (meter, aggregation) pair over the reporting window.
Value float64 `json:"value"`
// Timestamp is the window-start in epoch milliseconds (UTC day start).
Timestamp int64 `json:"timestamp"`
// IsCompleted is true only for sealed past buckets. In-progress buckets
// (e.g. the current UTC day) report IsCompleted=false so Zeus knows the value may still change.
IsCompleted bool `json:"isCompleted"`
// Dimensions is the per-reading label set.
Dimensions map[string]string `json:"dimensions"`
}
// PostableMeterReadings is the request body for Zeus.PutMeterReadings.
type PostableMeterReadings struct { // ! Needs fix once zeus contract is setup
// IdempotencyKey is echoed as the X-Idempotency-Key header and stored by
// Zeus so retries within the same tick window overwrite rather than duplicate.
IdempotencyKey string `json:"idempotencyKey"`
// Readings is the batch of meter values being shipped.
Readings []Reading `json:"readings"`
}

View File

@@ -73,6 +73,11 @@ func NewTraitsFromOrganization(org *Organization) map[string]any {
}
}
// DefaultRetentionDays is the retention shown in the UI when no explicit
// policy has been configured for an org. Both the API reader and the meter
// reporter use this value so a change here propagates everywhere.
const DefaultRetentionDays = 15
type TTLSetting struct {
bun.BaseModel `bun:"table:ttl_setting"`
Identifiable

View File

@@ -49,6 +49,10 @@ func (provider *provider) PutMetersV2(_ context.Context, _ string, _ []byte) err
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v2 is not supported")
}
func (provider *provider) PutMeterReadings(_ context.Context, _ string, _ string, _ []byte) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meter readings is not supported")
}
func (provider *provider) PutProfile(_ context.Context, _ string, _ *zeustypes.PostableProfile) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
}

View File

@@ -35,6 +35,11 @@ type Zeus interface {
// Puts the meters for the given license key using Zeus.
PutMetersV2(context.Context, string, []byte) error
// PutMeterReadings ships TDD-shape meter readings to the v2/meters
// endpoint. idempotencyKey is propagated as X-Idempotency-Key so Zeus can
// UPSERT on retries.
PutMeterReadings(ctx context.Context, licenseKey string, idempotencyKey string, body []byte) error
// Put profile for the given license key.
PutProfile(context.Context, string, *zeustypes.PostableProfile) error