Compare commits

...

1 Commits

Author SHA1 Message Date
Karan Balani
bc7dbab2b2 feat: meter reporter for new billing infra 2026-04-20 18:42:23 +05:30
22 changed files with 1035 additions and 0 deletions

View File

@@ -23,6 +23,7 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -109,6 +110,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
return signoz.NewAuditorProviderFactories()
},
func(_ licensing.Licensing, _ querier.Querier, _ sqlstore.SQLStore, _ organization.Getter, _ zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
return signoz.NewMeterReporterProviderFactories()
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
return querier.NewHandler(ps, q, a)
},

View File

@@ -17,6 +17,7 @@ import (
"github.com/SigNoz/signoz/ee/gateway/httpgateway"
enterpriselicensing "github.com/SigNoz/signoz/ee/licensing"
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
"github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter"
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration/implcloudprovider"
"github.com/SigNoz/signoz/ee/modules/dashboard/impldashboard"
@@ -38,6 +39,7 @@ import (
"github.com/SigNoz/signoz/pkg/gateway"
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
pkgcloudintegration "github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -157,6 +159,13 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
}
return factories
},
func(licensing licensing.Licensing, querier querier.Querier, sqlStore sqlstore.SQLStore, orgGetter organization.Getter, zeus zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
factories := signoz.NewMeterReporterProviderFactories()
if err := factories.Add(signozmeterreporter.NewFactory(licensing, querier, sqlStore, orgGetter, zeus)); err != nil {
panic(err)
}
return factories
},
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
communityHandler := querier.NewHandler(ps, q, a)
return eequerier.NewHandler(ps, q, communityHandler)

View File

@@ -407,3 +407,23 @@ cloudintegration:
agent:
# The version of the cloud integration agent.
version: v0.0.8
##################### Meter Reporter #####################
meterreporter:
# Specifies the meter reporter provider to use.
# noop: does not report any meters (community default).
# signoz: periodically queries meters via the querier and ships readings to Zeus (enterprise).
provider: noop
# The interval between collection ticks. Minimum 30m.
interval: 6h
# The per-tick timeout that bounds collect-and-ship work.
timeout: 30s
retry:
# Whether to retry on transient failures.
enabled: true
# The initial wait time before the first retry.
initial_interval: 5s
# The upper bound on backoff interval.
max_interval: 30s
# The total maximum time spent retrying.
max_elapsed_time: 1m

View File

@@ -0,0 +1,128 @@
package signozmeterreporter
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// tick collects one round of readings across orgs × collectors and ships them to zeus
// per-org / per-collector errors are logged and counted but do not abort the tick - sibling orgs still report.
func (provider *Provider) tick(ctx context.Context) error {
now := time.Now().UTC()
// Go to 00:00 UTC of current day (in milliseconds)
bucketStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
// Period in which meter data will be queried: 00:00 UTC → now UTC
window := meterreporter.Window{
StartMs: uint64(bucketStart.UnixMilli()),
EndMs: uint64(now.UnixMilli()),
BucketStartMs: bucketStart.UnixMilli(),
}
// Collect all the orgs handled by this SigNoz instance
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "failed to list organizations")
}
readingsByLicenseKey := make(map[string][]meterreportertypes.Reading)
for _, org := range orgs {
license, err := provider.licensing.GetActive(ctx, org.ID)
if err != nil {
provider.settings.Logger().WarnContext(ctx, "skipping org, failed to fetch active license", errors.Attr(err), slog.String("org_id", org.ID.StringValue()))
continue
}
if license == nil || license.Key == "" {
provider.settings.Logger().WarnContext(ctx, "skipping org, nil/empty license for org", slog.String("org_id", org.ID.StringValue()))
continue
}
orgReadings := provider.collectOrgReadings(ctx, org.ID, window)
if len(orgReadings) == 0 {
continue
}
readingsByLicenseKey[license.Key] = append(readingsByLicenseKey[license.Key], orgReadings...)
}
if len(readingsByLicenseKey) == 0 {
return nil
}
date := bucketStart.Format("2006-01-02")
for licenseKey, readings := range readingsByLicenseKey {
if err := provider.shipReadings(ctx, licenseKey, date, readings); err != nil {
provider.metrics.postErrors.Add(ctx, 1)
provider.settings.Logger().ErrorContext(ctx, "failed to ship meter readings", errors.Attr(err), slog.Int("readings", len(readings)))
continue
}
provider.metrics.readingsEmitted.Add(ctx, int64(len(readings)))
}
return nil
}
// collectOrgReadings runs every registered Meter's Collector against orgID and
// returns their combined Readings with DimensionOrganizationID attached.
// Individual meter failures are logged and skipped — one bad meter does not
// block the rest of the batch.
func (provider *Provider) collectOrgReadings(ctx context.Context, orgID valuer.UUID, window meterreporter.Window) []meterreportertypes.Reading {
readings := make([]meterreportertypes.Reading, 0, len(provider.meters))
for _, meter := range provider.meters {
collectedReadings, err := meter.Collector.Collect(ctx, meter, orgID, window)
if err != nil {
provider.metrics.collectErrors.Add(ctx, 1)
provider.settings.Logger().WarnContext(ctx, "meter collection failed", errors.Attr(err), slog.String("meter", meter.Name.String()), slog.String("org_id", orgID.StringValue()))
continue
}
for i := range collectedReadings {
if collectedReadings[i].Dimensions == nil {
collectedReadings[i].Dimensions = make(map[string]string, 1)
}
collectedReadings[i].Dimensions[meterreporter.DimensionOrganizationID] = orgID.StringValue()
}
readings = append(readings, collectedReadings...)
}
// ! (balanikaran): TEMP for debugging
provider.settings.Logger().InfoContext(ctx, "final readings", slog.Any("readings", readings))
return readings
}
// shipReadings encodes the batch as PostableMeterReadings JSON and POSTs it to
// Zeus in a single request. The date-scoped idempotency key lets Zeus UPSERT
// on subsequent ticks within the same UTC day.
func (provider *Provider) shipReadings(ctx context.Context, licenseKey string, date string, readings []meterreportertypes.Reading) error {
idempotencyKey := fmt.Sprintf("meter-cron:%s", date)
// ! TODO: this needs to be fixed in the format we make the zeus API
payload := meterreportertypes.PostableMeterReadings{
IdempotencyKey: idempotencyKey,
Readings: readings,
}
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "marshal meter readings")
}
if err := provider.zeus.PutMeterReadings(ctx, licenseKey, idempotencyKey, body); err != nil {
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "zeus put meter readings")
}
return nil
}

View File

@@ -0,0 +1,149 @@
package signozmeterreporter
import (
"context"
"log/slog"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/zeus"
)
var _ factory.ServiceWithHealthy = (*Provider)(nil)
// Provider is the enterprise meter reporter. It ticks on a fixed interval,
// invokes every registered Collector against every licensed org, and ships
// the resulting readings to Zeus.
type Provider struct {
settings factory.ScopedProviderSettings
config meterreporter.Config
meters []meterreporter.Meter
licensing licensing.Licensing
orgGetter organization.Getter
zeus zeus.Zeus
healthyC chan struct{}
stopC chan struct{}
goroutinesWg sync.WaitGroup
metrics *reporterMetrics
}
// NewFactory returns a ProviderFactory for the signoz meter reporter.
func NewFactory(
licensing licensing.Licensing,
querier querier.Querier,
sqlstore sqlstore.SQLStore,
orgGetter organization.Getter,
zeus zeus.Zeus,
) factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
return factory.NewProviderFactory(
factory.MustNewName("signoz"),
func(ctx context.Context, providerSettings factory.ProviderSettings, config meterreporter.Config) (meterreporter.Reporter, error) {
return newProvider(ctx, providerSettings, config, licensing, querier, sqlstore, orgGetter, zeus)
},
)
}
func newProvider(
_ context.Context,
providerSettings factory.ProviderSettings,
config meterreporter.Config,
licensing licensing.Licensing,
querier querier.Querier,
sqlstore sqlstore.SQLStore,
orgGetter organization.Getter,
zeus zeus.Zeus,
) (*Provider, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter")
metrics, err := newReporterMetrics(settings.Meter())
if err != nil {
return nil, err
}
meters, err := meterreporter.DefaultMeters(querier, sqlstore)
if err != nil {
return nil, err
}
return &Provider{
settings: settings,
config: config,
meters: meters,
licensing: licensing,
orgGetter: orgGetter,
zeus: zeus,
healthyC: make(chan struct{}),
stopC: make(chan struct{}),
metrics: metrics,
}, nil
}
// Start runs an initial tick and then loops on the configured interval until
// Stop is called. Start blocks until the goroutine returns, matching the
// factory.Service contract used across the codebase.
func (provider *Provider) Start(ctx context.Context) error {
close(provider.healthyC)
provider.goroutinesWg.Add(1)
go func() {
defer provider.goroutinesWg.Done()
provider.runTick(ctx)
ticker := time.NewTicker(provider.config.Interval)
defer ticker.Stop()
for {
select {
case <-provider.stopC:
return
case <-ticker.C:
provider.runTick(ctx)
}
}
}()
provider.goroutinesWg.Wait()
return nil
}
// Stop requests the reporter to stop, waits for the in-flight tick (bounded by
// Config.Timeout) to complete, and returns.
func (provider *Provider) Stop(_ context.Context) error {
<-provider.healthyC
select {
case <-provider.stopC:
// already closed
default:
close(provider.stopC)
}
provider.goroutinesWg.Wait()
return nil
}
func (provider *Provider) Healthy() <-chan struct{} {
return provider.healthyC
}
// runTick executes one collect-and-ship cycle under Config.Timeout. Errors are
// logged and counted; they do not propagate because the reporter must keep
// ticking on subsequent intervals.
func (provider *Provider) runTick(parentCtx context.Context) {
provider.metrics.ticks.Add(parentCtx, 1)
ctx, cancel := context.WithTimeout(parentCtx, provider.config.Timeout)
defer cancel()
if err := provider.tick(ctx); err != nil {
provider.settings.Logger().ErrorContext(ctx, "meter reporter tick failed", errors.Attr(err), slog.Duration("timeout", provider.config.Timeout))
}
}

View File

@@ -0,0 +1,48 @@
package signozmeterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"go.opentelemetry.io/otel/metric"
)
type reporterMetrics struct {
ticks metric.Int64Counter
readingsEmitted metric.Int64Counter
collectErrors metric.Int64Counter
postErrors metric.Int64Counter
}
func newReporterMetrics(meter metric.Meter) (*reporterMetrics, error) {
var errs error
ticks, err := meter.Int64Counter("signoz.meterreporter.ticks", metric.WithDescription("Total number of meter reporter ticks that ran to completion or aborted."))
if err != nil {
errs = errors.Join(errs, err)
}
readingsEmitted, err := meter.Int64Counter("signoz.meterreporter.readings.emitted", metric.WithDescription("Total number of meter readings shipped to Zeus."))
if err != nil {
errs = errors.Join(errs, err)
}
collectErrors, err := meter.Int64Counter("signoz.meterreporter.collect.errors", metric.WithDescription("Total number of collect errors across organizations and meters."))
if err != nil {
errs = errors.Join(errs, err)
}
postErrors, err := meter.Int64Counter("signoz.meterreporter.post.errors", metric.WithDescription("Total number of Zeus POST failures."))
if err != nil {
errs = errors.Join(errs, err)
}
if errs != nil {
return nil, errs
}
return &reporterMetrics{
ticks: ticks,
readingsEmitted: readingsEmitted,
collectErrors: collectErrors,
postErrors: postErrors,
}, nil
}

View File

@@ -148,6 +148,24 @@ func (provider *Provider) PutMetersV2(ctx context.Context, key string, data []by
return err
}
func (provider *Provider) PutMeterReadings(ctx context.Context, key string, idempotencyKey string, data []byte) error {
headers := http.Header{}
if idempotencyKey != "" {
headers.Set("X-Idempotency-Key", idempotencyKey)
}
_, err := provider.doWithHeaders(
ctx,
provider.config.URL.JoinPath("/v2/meters"),
http.MethodPost,
key,
data,
headers,
)
return err
}
func (provider *Provider) PutProfile(ctx context.Context, key string, profile *zeustypes.PostableProfile) error {
body, err := json.Marshal(profile)
if err != nil {
@@ -183,12 +201,21 @@ func (provider *Provider) PutHost(ctx context.Context, key string, host *zeustyp
}
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
return provider.doWithHeaders(ctx, url, method, key, requestBody, nil)
}
func (provider *Provider) doWithHeaders(ctx context.Context, url *url.URL, method string, key string, requestBody []byte, extraHeaders http.Header) ([]byte, error) {
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
request.Header.Set("Content-Type", "application/json")
for k, vs := range extraHeaders {
for _, v := range vs {
request.Header.Add(k, v)
}
}
response, err := provider.httpClient.Do(request)
if err != nil {

View File

@@ -0,0 +1,26 @@
package meterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Window is the reporting window a Collector produces readings for. Timestamps
// are unix milliseconds; BucketStartMs is the emitted reading's Timestamp and
// typically aligns to UTC day start.
type Window struct {
StartMs uint64
EndMs uint64
BucketStartMs int64 // ! See if this can be removed
}
// Collector produces readings for a single Meter. Implementations are
// stateless — the Meter carries all per-meter configuration — so a single
// Collector instance may be shared across every entry in the registry.
//
// Collect must be safe to call concurrently across orgs.
type Collector interface {
Collect(ctx context.Context, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error)
}

View File

@@ -0,0 +1,65 @@
package meterreporter
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
var _ factory.Config = (*Config)(nil)
type Config struct {
// Provider selects the reporter implementation (default "noop").
Provider string `mapstructure:"provider"`
// Interval is how often the reporter collects and ships meter readings.
Interval time.Duration `mapstructure:"interval"`
// Timeout bounds a single collect-and-ship cycle.
Timeout time.Duration `mapstructure:"timeout"`
// Retry configures exponential backoff for transient Zeus failures.
Retry RetryConfig `mapstructure:"retry"`
}
type RetryConfig struct {
Enabled bool `mapstructure:"enabled"`
InitialInterval time.Duration `mapstructure:"initial_interval"`
MaxInterval time.Duration `mapstructure:"max_interval"`
MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"`
}
func newConfig() factory.Config {
return Config{
Provider: "noop",
Interval: 6 * time.Hour,
Timeout: 30 * time.Second,
Retry: RetryConfig{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
},
}
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("meterreporter"), newConfig)
}
func (c Config) Validate() error {
if c.Interval < 30*time.Minute {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be at least 30m")
}
if c.Timeout <= 0 {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be greater than 0")
}
if c.Timeout >= c.Interval {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be less than meterreporter::interval")
}
return nil
}

View File

@@ -0,0 +1,34 @@
package meterreporter
import (
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
// Meter is one registered meter — a name + how to produce readings for it. A meter is the single
// unit of extension: to add a new meter, append one Meter to the default registry (see registry.go).
//
// The same metric Name may appear multiple times in the registry as long as each entry
// uses a different SpaceAggregation (for example min/max/p99 of the same source meter).
type Meter struct {
// Name is the meter's identifier.
Name meterreportertypes.Name
// Unit is reported verbatim as the signoz.billing.unit dimension.
Unit string
// RetentionDomain indicates which product TTL should be surfaced as the signoz.billing.retention.days dimension for this meter.
RetentionDomain RetentionDomain
// TimeAggregation reduces per-series samples across the query window.
TimeAggregation metrictypes.TimeAggregation
// SpaceAggregation reduces across series and is reported verbatim as the signoz.billing.aggregation dimension.
SpaceAggregation metrictypes.SpaceAggregation
// FilterExpression is an optional filter pushed into the query builder (e.g. "service.name = 'cart'").
FilterExpression string
// Collector knows how to turn this Meter into zero or more Readings per tick.
Collector Collector
}

View File

@@ -0,0 +1,26 @@
package meterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
)
var (
ErrCodeInvalidInput = errors.MustNewCode("meterreporter_invalid_input")
ErrCodeReportFailed = errors.MustNewCode("meterreporter_report_failed")
)
// Dimension keys automatically attached to every Reading.
const (
DimensionAggregation = "signoz.billing.aggregation"
DimensionUnit = "signoz.billing.unit"
DimensionOrganizationID = "signoz.billing.organization.id"
DimensionRetentionDays = "signoz.billing.retention.days"
)
// Reporter periodically collects meter values via the query service and ships
// them to Zeus. Implementations must satisfy factory.ServiceWithHealthy so the
// signoz registry can wait on startup and request graceful shutdown.
type Reporter interface {
factory.ServiceWithHealthy
}

View File

@@ -0,0 +1,39 @@
package noopmeterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/meterreporter"
)
type provider struct {
healthyC chan struct{}
stopC chan struct{}
}
func NewFactory() factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
}
func New(_ context.Context, _ factory.ProviderSettings, _ meterreporter.Config) (meterreporter.Reporter, error) {
return &provider{
healthyC: make(chan struct{}),
stopC: make(chan struct{}),
}, nil
}
func (p *provider) Start(_ context.Context) error {
close(p.healthyC)
<-p.stopC
return nil
}
func (p *provider) Stop(_ context.Context) error {
close(p.stopC)
return nil
}
func (p *provider) Healthy() <-chan struct{} {
return p.healthyC
}

View File

@@ -0,0 +1,131 @@
package meterreporter
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var _ Collector = (*QueryCollector)(nil)
// QueryCollector produces a single scalar Reading per Collect call by issuing a RequestTypeScalar query
// against the querier over SourceMeter. It reads everything it needs from the Meter it is invoked with.
type QueryCollector struct {
querier querier.Querier
}
func NewQueryCollector(q querier.Querier) *QueryCollector {
return &QueryCollector{querier: q}
}
func (c *QueryCollector) Collect(ctx context.Context, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
req := buildQueryRequest(meter, window.StartMs, window.EndMs)
resp, err := c.querier.QueryRange(ctx, orgID, req)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query range for meter %q", meter.Name.String())
}
value, ok := extractScalarValue(resp)
if !ok {
return nil, nil
}
return []meterreportertypes.Reading{{
MeterName: meter.Name.String(),
Value: value,
Timestamp: window.BucketStartMs,
IsCompleted: false,
Dimensions: map[string]string{
DimensionAggregation: meter.SpaceAggregation.StringValue(),
DimensionUnit: meter.Unit,
},
}}, nil
}
// buildQueryRequest composes a single-query, single-aggregation scalar request over the meter source.
// The querier applies its own step interval defaults for SourceMeter.
func buildQueryRequest(meter Meter, startMs, endMs uint64) *querybuildertypesv5.QueryRangeRequest {
builderQuery := querybuildertypesv5.QueryBuilderQuery[querybuildertypesv5.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
Source: telemetrytypes.SourceMeter,
Aggregations: []querybuildertypesv5.MetricAggregation{{
MetricName: meter.Name.String(),
TimeAggregation: meter.TimeAggregation,
SpaceAggregation: meter.SpaceAggregation,
}},
}
if meter.FilterExpression != "" {
builderQuery.Filter = &querybuildertypesv5.Filter{Expression: meter.FilterExpression}
}
return &querybuildertypesv5.QueryRangeRequest{
Start: startMs,
End: endMs,
RequestType: querybuildertypesv5.RequestTypeScalar,
CompositeQuery: querybuildertypesv5.CompositeQuery{
Queries: []querybuildertypesv5.QueryEnvelope{
{
Type: querybuildertypesv5.QueryTypeBuilder,
Spec: builderQuery,
},
},
},
NoCache: true,
}
}
// extractScalarValue pulls the single scalar value out of a ScalarData result.
// Returns (value, true) for a well-formed single-row/single-aggregation
// result, (0, false) otherwise (empty, multi-row, non-scalar).
func extractScalarValue(resp *querybuildertypesv5.QueryRangeResponse) (float64, bool) {
if resp == nil || len(resp.Data.Results) == 0 {
return 0, false
}
scalar, ok := resp.Data.Results[0].(*querybuildertypesv5.ScalarData)
if !ok {
if direct, ok := resp.Data.Results[0].(querybuildertypesv5.ScalarData); ok {
scalar = &direct
} else {
return 0, false
}
}
if len(scalar.Data) == 0 || len(scalar.Data[0]) == 0 {
return 0, false
}
for colIdx, col := range scalar.Columns {
if col == nil {
continue
}
if col.Type == querybuildertypesv5.ColumnTypeAggregation {
if colIdx >= len(scalar.Data[0]) {
return 0, false
}
switch v := scalar.Data[0][colIdx].(type) {
case float64:
return v, true
case float32:
return float64(v), true
case int:
return float64(v), true
case int64:
return float64(v), true
case uint64:
return float64(v), true
}
return 0, false
}
}
return 0, false
}

View File

@@ -0,0 +1,99 @@
package meterreporter
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
// Exported names for every meter the reporter knows about. Refer to these
// symbols (not string literals) everywhere - typos turn into compile errors
// instead of silently producing a new meter row at Zeus.
var (
MeterLogCount = meterreportertypes.MustNewName("signoz.meter.log.count")
MeterLogSize = meterreportertypes.MustNewName("signoz.meter.log.size")
)
func baseMeters(q querier.Querier, sqlstore sqlstore.SQLStore) []*Meter {
queryCollector := NewQueryCollector(q)
retentionAwareQueryCollector := NewRetentionDimensionsCollector(queryCollector, NewSQLRetentionResolver(sqlstore))
meters := []*Meter{
{
Name: MeterLogCount,
Unit: "count",
RetentionDomain: RetentionDomainLogs,
TimeAggregation: metrictypes.TimeAggregationSum,
SpaceAggregation: metrictypes.SpaceAggregationSum,
Collector: retentionAwareQueryCollector,
},
{
Name: MeterLogSize,
Unit: "bytes",
RetentionDomain: RetentionDomainLogs,
TimeAggregation: metrictypes.TimeAggregationSum,
SpaceAggregation: metrictypes.SpaceAggregationSum,
Collector: retentionAwareQueryCollector,
},
}
mustValidateMeters(meters...)
return meters
}
// DefaultMeters returns the hardcoded query-backed meters supported by the reporter.
func DefaultMeters(q querier.Querier, sqlstore sqlstore.SQLStore) ([]Meter, error) {
meters := baseMeters(q, sqlstore)
if err := validateMeters(meters...); err != nil {
return nil, err
}
resolved := make([]Meter, 0, len(meters))
for _, meter := range meters {
resolved = append(resolved, *meter)
}
return resolved, nil
}
// validateMeters checks that the runtime meter list is internally consistent.
// Every meter must:
// - have a non-zero Name,
// - have a non-empty Unit,
// - have a non-nil Collector,
// - use a unique (Name, SpaceAggregation) pair.
func validateMeters(meters ...*Meter) error {
seen := make(map[string]struct{}, len(meters))
for _, meter := range meters {
if meter == nil {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "nil meter in registry")
}
if meter.Name.IsZero() {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter with empty name in registry")
}
if meter.Unit == "" {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no unit", meter.Name.String())
}
if meter.Collector == nil {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no collector", meter.Name.String())
}
key := meter.Name.String() + "|" + meter.SpaceAggregation.StringValue()
if _, ok := seen[key]; ok {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "duplicate meter %q with aggregation %q", meter.Name.String(), meter.SpaceAggregation.StringValue())
}
seen[key] = struct{}{}
}
return nil
}
// mustValidateMeters panics when hardcoded meter declarations are invalid.
func mustValidateMeters(meters ...*Meter) {
if err := validateMeters(meters...); err != nil {
panic(err)
}
}

View File

@@ -0,0 +1,124 @@
package meterreporter
import (
"context"
"database/sql"
"strconv"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type RetentionDomain string
const (
RetentionDomainNone RetentionDomain = ""
RetentionDomainLogs RetentionDomain = "logs"
RetentionDomainMetrics RetentionDomain = "metrics"
RetentionDomainTraces RetentionDomain = "traces"
)
type retentionResolver interface {
ResolveDays(ctx context.Context, orgID valuer.UUID, domain RetentionDomain) (string, bool, error)
}
type retentionDimensionsCollector struct {
inner Collector
resolver retentionResolver
}
func NewRetentionDimensionsCollector(inner Collector, resolver retentionResolver) Collector {
if inner == nil || resolver == nil {
return inner
}
return &retentionDimensionsCollector{
inner: inner,
resolver: resolver,
}
}
func (c *retentionDimensionsCollector) Collect(ctx context.Context, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
readings, err := c.inner.Collect(ctx, meter, orgID, window)
if err != nil || len(readings) == 0 || meter.RetentionDomain == RetentionDomainNone {
return readings, err
}
retentionDays, ok, err := c.resolver.ResolveDays(ctx, orgID, meter.RetentionDomain)
if err != nil {
return nil, err
}
if !ok {
return readings, nil
}
for i := range readings {
if readings[i].Dimensions == nil {
readings[i].Dimensions = make(map[string]string, 1)
}
readings[i].Dimensions[DimensionRetentionDays] = retentionDays
}
return readings, nil
}
type sqlRetentionResolver struct {
sqlstore sqlstore.SQLStore
}
func NewSQLRetentionResolver(sqlstore sqlstore.SQLStore) retentionResolver {
if sqlstore == nil {
return nil
}
return &sqlRetentionResolver{sqlstore: sqlstore}
}
func (r *sqlRetentionResolver) ResolveDays(ctx context.Context, orgID valuer.UUID, domain RetentionDomain) (string, bool, error) {
tableName, ok := retentionTableName(domain)
if !ok {
return "", false, nil
}
ttl := new(types.TTLSetting)
err := r.sqlstore.
BunDB().
NewSelect().
Model(ttl).
Where("table_name = ?", tableName).
Where("org_id = ?", orgID.StringValue()).
OrderExpr("created_at DESC").
Limit(1).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return "", false, nil
}
return "", false, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "load retention for domain %q", domain)
}
if ttl.TTL <= 0 {
return "", false, nil
}
return strconv.Itoa(ttl.TTL / (24 * 3600)), true, nil
}
func retentionTableName(domain RetentionDomain) (string, bool) {
switch domain {
case RetentionDomainLogs:
return telemetrylogs.DBName + "." + telemetrylogs.LogsV2TableName, true
case RetentionDomainMetrics:
return telemetrymetrics.DBName + "." + telemetrymetrics.SamplesV4TableName, true
case RetentionDomainTraces:
return telemetrytraces.DBName + "." + telemetrytraces.SpanIndexV3TableName, true
default:
return "", false
}
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
@@ -129,6 +130,9 @@ type Config struct {
// Auditor config
Auditor auditor.Config `mapstructure:"auditor"`
// MeterReporter config
MeterReporter meterreporter.Config `mapstructure:"meterreporter"`
// CloudIntegration config
CloudIntegration cloudintegration.Config `mapstructure:"cloudintegration"`
}
@@ -162,6 +166,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
identn.NewConfigFactory(),
serviceaccount.NewConfigFactory(),
auditor.NewConfigFactory(),
meterreporter.NewConfigFactory(),
cloudintegration.NewConfigFactory(),
}

View File

@@ -28,6 +28,8 @@ import (
"github.com/SigNoz/signoz/pkg/identn/apikeyidentn"
"github.com/SigNoz/signoz/pkg/identn/impersonationidentn"
"github.com/SigNoz/signoz/pkg/identn/tokenizeridentn"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/meterreporter/noopmeterreporter"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
@@ -315,6 +317,12 @@ func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[audi
)
}
func NewMeterReporterProviderFactories() factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
return factory.MustNewNamedMap(
noopmeterreporter.NewFactory(),
)
}
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
return factory.MustNewNamedMap(
configflagger.NewFactory(registry),

View File

@@ -22,6 +22,7 @@ import (
"github.com/SigNoz/signoz/pkg/identn"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -84,6 +85,7 @@ type SigNoz struct {
Flagger flagger.Flagger
Gateway gateway.Gateway
Auditor auditor.Auditor
MeterReporter meterreporter.Reporter
}
func New(
@@ -104,6 +106,7 @@ func New(
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
meterReporterProviderFactories func(licensing.Licensing, querier.Querier, sqlstore.SQLStore, organization.Getter, zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]],
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
cloudIntegrationCallback func(sqlstore.SQLStore, global.Global, zeus.Zeus, gateway.Gateway, licensing.Licensing, serviceaccount.Module, cloudintegration.Config) (cloudintegration.Module, error),
rulerProviderFactories func(cache.Cache, alertmanager.Alertmanager, sqlstore.SQLStore, telemetrystore.TelemetryStore, telemetrytypes.MetadataStore, prometheus.Prometheus, organization.Getter, rulestatehistory.Module, querier.Querier, queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]],
@@ -377,6 +380,12 @@ func New(
return nil, err
}
// Initialize meter reporter from the variant-specific provider factories
meterReporter, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.MeterReporter, meterReporterProviderFactories(licensing, querier, sqlstore, orgGetter, zeus), config.MeterReporter.Provider)
if err != nil {
return nil, err
}
// Initialize authns
store := sqlauthnstore.NewStore(sqlstore)
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
@@ -491,6 +500,7 @@ func New(
factory.NewNamedService(factory.MustNewName("authz"), authz),
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
factory.NewNamedService(factory.MustNewName("meterreporter"), meterReporter, factory.MustNewName("licensing")),
factory.NewNamedService(factory.MustNewName("ruler"), rulerInstance),
)
if err != nil {
@@ -540,5 +550,6 @@ func New(
Flagger: flagger,
Gateway: gateway,
Auditor: auditor,
MeterReporter: meterReporter,
}, nil
}

View File

@@ -0,0 +1,41 @@
package meterreportertypes
import (
"regexp"
"github.com/SigNoz/signoz/pkg/errors"
)
var nameRegex = regexp.MustCompile(`^[a-z][a-z0-9_.]+$`)
// Name is a concrete type for a meter name. Dotted namespace identifiers like
// "signoz.meter.log.count" are permitted; arbitrary strings are not, to avoid
// typos silently producing distinct meter rows at Zeus.
type Name struct {
s string
}
func NewName(s string) (Name, error) {
if !nameRegex.MatchString(s) {
return Name{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid meter name: %s", s)
}
return Name{s: s}, nil
}
func MustNewName(s string) Name {
name, err := NewName(s)
if err != nil {
panic(err)
}
return name
}
func (n Name) String() string {
return n.s
}
func (n Name) IsZero() bool {
return n.s == ""
}

View File

@@ -0,0 +1,32 @@
package meterreportertypes
// Reading is a single meter value sent to Zeus. Zeus UPSERTs on
// (license_key, dimension_hash, timestamp), so repeated readings within the
// same tick window safely overwrite prior values.
type Reading struct {
// MeterName is the fully-qualified meter identifier.
MeterName string `json:"meterName"`
// Value is the aggregated scalar for this (meter, aggregation) pair over the reporting window.
Value float64 `json:"value"`
// Timestamp is the window-start in epoch milliseconds (UTC day start).
Timestamp int64 `json:"timestamp"`
// IsCompleted is true only for sealed past buckets. In-progress buckets
// (e.g. the current UTC day) report IsCompleted=false so Zeus knows the value may still change.
IsCompleted bool `json:"isCompleted"`
// Dimensions is the per-reading label set.
Dimensions map[string]string `json:"dimensions"`
}
// PostableMeterReadings is the request body for Zeus.PutMeterReadings.
type PostableMeterReadings struct { // ! Needs fix once zeus contract is setup
// IdempotencyKey is echoed as the X-Idempotency-Key header and stored by
// Zeus so retries within the same tick window overwrite rather than duplicate.
IdempotencyKey string `json:"idempotencyKey"`
// Readings is the batch of meter values being shipped.
Readings []Reading `json:"readings"`
}

View File

@@ -49,6 +49,10 @@ func (provider *provider) PutMetersV2(_ context.Context, _ string, _ []byte) err
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v2 is not supported")
}
func (provider *provider) PutMeterReadings(_ context.Context, _ string, _ string, _ []byte) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meter readings is not supported")
}
func (provider *provider) PutProfile(_ context.Context, _ string, _ *zeustypes.PostableProfile) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
}

View File

@@ -35,6 +35,11 @@ type Zeus interface {
// Puts the meters for the given license key using Zeus.
PutMetersV2(context.Context, string, []byte) error
// PutMeterReadings ships TDD-shape meter readings to the v2/meters
// endpoint. idempotencyKey is propagated as X-Idempotency-Key so Zeus can
// UPSERT on retries.
PutMeterReadings(ctx context.Context, licenseKey string, idempotencyKey string, body []byte) error
// Put profile for the given license key.
PutProfile(context.Context, string, *zeustypes.PostableProfile) error