mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-20 18:50:29 +01:00
Compare commits
1 Commits
chore/json
...
feat/billi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc7dbab2b2 |
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
@@ -109,6 +110,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
|
||||
return signoz.NewAuditorProviderFactories()
|
||||
},
|
||||
func(_ licensing.Licensing, _ querier.Querier, _ sqlstore.SQLStore, _ organization.Getter, _ zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
return signoz.NewMeterReporterProviderFactories()
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
return querier.NewHandler(ps, q, a)
|
||||
},
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/SigNoz/signoz/ee/gateway/httpgateway"
|
||||
enterpriselicensing "github.com/SigNoz/signoz/ee/licensing"
|
||||
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
|
||||
"github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter"
|
||||
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration/implcloudprovider"
|
||||
"github.com/SigNoz/signoz/ee/modules/dashboard/impldashboard"
|
||||
@@ -38,6 +39,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/gateway"
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
pkgcloudintegration "github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
@@ -157,6 +159,13 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(licensing licensing.Licensing, querier querier.Querier, sqlStore sqlstore.SQLStore, orgGetter organization.Getter, zeus zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
factories := signoz.NewMeterReporterProviderFactories()
|
||||
if err := factories.Add(signozmeterreporter.NewFactory(licensing, querier, sqlStore, orgGetter, zeus)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
communityHandler := querier.NewHandler(ps, q, a)
|
||||
return eequerier.NewHandler(ps, q, communityHandler)
|
||||
|
||||
@@ -407,3 +407,23 @@ cloudintegration:
|
||||
agent:
|
||||
# The version of the cloud integration agent.
|
||||
version: v0.0.8
|
||||
|
||||
##################### Meter Reporter #####################
|
||||
meterreporter:
|
||||
# Specifies the meter reporter provider to use.
|
||||
# noop: does not report any meters (community default).
|
||||
# signoz: periodically queries meters via the querier and ships readings to Zeus (enterprise).
|
||||
provider: noop
|
||||
# The interval between collection ticks. Minimum 30m.
|
||||
interval: 6h
|
||||
# The per-tick timeout that bounds collect-and-ship work.
|
||||
timeout: 30s
|
||||
retry:
|
||||
# Whether to retry on transient failures.
|
||||
enabled: true
|
||||
# The initial wait time before the first retry.
|
||||
initial_interval: 5s
|
||||
# The upper bound on backoff interval.
|
||||
max_interval: 30s
|
||||
# The total maximum time spent retrying.
|
||||
max_elapsed_time: 1m
|
||||
|
||||
128
ee/meterreporter/signozmeterreporter/collect.go
Normal file
128
ee/meterreporter/signozmeterreporter/collect.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// tick collects one round of readings across orgs × collectors and ships them to zeus
|
||||
// per-org / per-collector errors are logged and counted but do not abort the tick - sibling orgs still report.
|
||||
func (provider *Provider) tick(ctx context.Context) error {
|
||||
now := time.Now().UTC()
|
||||
|
||||
// Go to 00:00 UTC of current day (in milliseconds)
|
||||
bucketStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
||||
|
||||
// Period in which meter data will be queried: 00:00 UTC → now UTC
|
||||
window := meterreporter.Window{
|
||||
StartMs: uint64(bucketStart.UnixMilli()),
|
||||
EndMs: uint64(now.UnixMilli()),
|
||||
BucketStartMs: bucketStart.UnixMilli(),
|
||||
}
|
||||
|
||||
// Collect all the orgs handled by this SigNoz instance
|
||||
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "failed to list organizations")
|
||||
}
|
||||
|
||||
readingsByLicenseKey := make(map[string][]meterreportertypes.Reading)
|
||||
|
||||
for _, org := range orgs {
|
||||
license, err := provider.licensing.GetActive(ctx, org.ID)
|
||||
if err != nil {
|
||||
provider.settings.Logger().WarnContext(ctx, "skipping org, failed to fetch active license", errors.Attr(err), slog.String("org_id", org.ID.StringValue()))
|
||||
continue
|
||||
}
|
||||
if license == nil || license.Key == "" {
|
||||
provider.settings.Logger().WarnContext(ctx, "skipping org, nil/empty license for org", slog.String("org_id", org.ID.StringValue()))
|
||||
continue
|
||||
}
|
||||
|
||||
orgReadings := provider.collectOrgReadings(ctx, org.ID, window)
|
||||
if len(orgReadings) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
readingsByLicenseKey[license.Key] = append(readingsByLicenseKey[license.Key], orgReadings...)
|
||||
}
|
||||
|
||||
if len(readingsByLicenseKey) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
date := bucketStart.Format("2006-01-02")
|
||||
for licenseKey, readings := range readingsByLicenseKey {
|
||||
if err := provider.shipReadings(ctx, licenseKey, date, readings); err != nil {
|
||||
provider.metrics.postErrors.Add(ctx, 1)
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to ship meter readings", errors.Attr(err), slog.Int("readings", len(readings)))
|
||||
continue
|
||||
}
|
||||
provider.metrics.readingsEmitted.Add(ctx, int64(len(readings)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// collectOrgReadings runs every registered Meter's Collector against orgID and
|
||||
// returns their combined Readings with DimensionOrganizationID attached.
|
||||
// Individual meter failures are logged and skipped — one bad meter does not
|
||||
// block the rest of the batch.
|
||||
func (provider *Provider) collectOrgReadings(ctx context.Context, orgID valuer.UUID, window meterreporter.Window) []meterreportertypes.Reading {
|
||||
readings := make([]meterreportertypes.Reading, 0, len(provider.meters))
|
||||
|
||||
for _, meter := range provider.meters {
|
||||
collectedReadings, err := meter.Collector.Collect(ctx, meter, orgID, window)
|
||||
if err != nil {
|
||||
provider.metrics.collectErrors.Add(ctx, 1)
|
||||
provider.settings.Logger().WarnContext(ctx, "meter collection failed", errors.Attr(err), slog.String("meter", meter.Name.String()), slog.String("org_id", orgID.StringValue()))
|
||||
continue
|
||||
}
|
||||
|
||||
for i := range collectedReadings {
|
||||
if collectedReadings[i].Dimensions == nil {
|
||||
collectedReadings[i].Dimensions = make(map[string]string, 1)
|
||||
}
|
||||
collectedReadings[i].Dimensions[meterreporter.DimensionOrganizationID] = orgID.StringValue()
|
||||
}
|
||||
|
||||
readings = append(readings, collectedReadings...)
|
||||
}
|
||||
|
||||
// ! (balanikaran): TEMP for debugging
|
||||
provider.settings.Logger().InfoContext(ctx, "final readings", slog.Any("readings", readings))
|
||||
|
||||
return readings
|
||||
}
|
||||
|
||||
// shipReadings encodes the batch as PostableMeterReadings JSON and POSTs it to
|
||||
// Zeus in a single request. The date-scoped idempotency key lets Zeus UPSERT
|
||||
// on subsequent ticks within the same UTC day.
|
||||
func (provider *Provider) shipReadings(ctx context.Context, licenseKey string, date string, readings []meterreportertypes.Reading) error {
|
||||
idempotencyKey := fmt.Sprintf("meter-cron:%s", date)
|
||||
|
||||
// ! TODO: this needs to be fixed in the format we make the zeus API
|
||||
payload := meterreportertypes.PostableMeterReadings{
|
||||
IdempotencyKey: idempotencyKey,
|
||||
Readings: readings,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "marshal meter readings")
|
||||
}
|
||||
|
||||
if err := provider.zeus.PutMeterReadings(ctx, licenseKey, idempotencyKey, body); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "zeus put meter readings")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
149
ee/meterreporter/signozmeterreporter/provider.go
Normal file
149
ee/meterreporter/signozmeterreporter/provider.go
Normal file
@@ -0,0 +1,149 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
)
|
||||
|
||||
var _ factory.ServiceWithHealthy = (*Provider)(nil)
|
||||
|
||||
// Provider is the enterprise meter reporter. It ticks on a fixed interval,
|
||||
// invokes every registered Collector against every licensed org, and ships
|
||||
// the resulting readings to Zeus.
|
||||
type Provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
config meterreporter.Config
|
||||
meters []meterreporter.Meter
|
||||
|
||||
licensing licensing.Licensing
|
||||
orgGetter organization.Getter
|
||||
zeus zeus.Zeus
|
||||
|
||||
healthyC chan struct{}
|
||||
stopC chan struct{}
|
||||
goroutinesWg sync.WaitGroup
|
||||
metrics *reporterMetrics
|
||||
}
|
||||
|
||||
// NewFactory returns a ProviderFactory for the signoz meter reporter.
|
||||
func NewFactory(
|
||||
licensing licensing.Licensing,
|
||||
querier querier.Querier,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
zeus zeus.Zeus,
|
||||
) factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName("signoz"),
|
||||
func(ctx context.Context, providerSettings factory.ProviderSettings, config meterreporter.Config) (meterreporter.Reporter, error) {
|
||||
return newProvider(ctx, providerSettings, config, licensing, querier, sqlstore, orgGetter, zeus)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func newProvider(
|
||||
_ context.Context,
|
||||
providerSettings factory.ProviderSettings,
|
||||
config meterreporter.Config,
|
||||
licensing licensing.Licensing,
|
||||
querier querier.Querier,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
zeus zeus.Zeus,
|
||||
) (*Provider, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter")
|
||||
|
||||
metrics, err := newReporterMetrics(settings.Meter())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meters, err := meterreporter.DefaultMeters(querier, sqlstore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Provider{
|
||||
settings: settings,
|
||||
config: config,
|
||||
meters: meters,
|
||||
licensing: licensing,
|
||||
orgGetter: orgGetter,
|
||||
zeus: zeus,
|
||||
healthyC: make(chan struct{}),
|
||||
stopC: make(chan struct{}),
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start runs an initial tick and then loops on the configured interval until
|
||||
// Stop is called. Start blocks until the goroutine returns, matching the
|
||||
// factory.Service contract used across the codebase.
|
||||
func (provider *Provider) Start(ctx context.Context) error {
|
||||
close(provider.healthyC)
|
||||
|
||||
provider.goroutinesWg.Add(1)
|
||||
go func() {
|
||||
defer provider.goroutinesWg.Done()
|
||||
|
||||
provider.runTick(ctx)
|
||||
|
||||
ticker := time.NewTicker(provider.config.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-provider.stopC:
|
||||
return
|
||||
case <-ticker.C:
|
||||
provider.runTick(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
provider.goroutinesWg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop requests the reporter to stop, waits for the in-flight tick (bounded by
|
||||
// Config.Timeout) to complete, and returns.
|
||||
func (provider *Provider) Stop(_ context.Context) error {
|
||||
<-provider.healthyC
|
||||
select {
|
||||
case <-provider.stopC:
|
||||
// already closed
|
||||
default:
|
||||
close(provider.stopC)
|
||||
}
|
||||
provider.goroutinesWg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) Healthy() <-chan struct{} {
|
||||
return provider.healthyC
|
||||
}
|
||||
|
||||
// runTick executes one collect-and-ship cycle under Config.Timeout. Errors are
|
||||
// logged and counted; they do not propagate because the reporter must keep
|
||||
// ticking on subsequent intervals.
|
||||
func (provider *Provider) runTick(parentCtx context.Context) {
|
||||
provider.metrics.ticks.Add(parentCtx, 1)
|
||||
|
||||
ctx, cancel := context.WithTimeout(parentCtx, provider.config.Timeout)
|
||||
defer cancel()
|
||||
|
||||
if err := provider.tick(ctx); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "meter reporter tick failed", errors.Attr(err), slog.Duration("timeout", provider.config.Timeout))
|
||||
}
|
||||
}
|
||||
48
ee/meterreporter/signozmeterreporter/telemetry.go
Normal file
48
ee/meterreporter/signozmeterreporter/telemetry.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
type reporterMetrics struct {
|
||||
ticks metric.Int64Counter
|
||||
readingsEmitted metric.Int64Counter
|
||||
collectErrors metric.Int64Counter
|
||||
postErrors metric.Int64Counter
|
||||
}
|
||||
|
||||
func newReporterMetrics(meter metric.Meter) (*reporterMetrics, error) {
|
||||
var errs error
|
||||
|
||||
ticks, err := meter.Int64Counter("signoz.meterreporter.ticks", metric.WithDescription("Total number of meter reporter ticks that ran to completion or aborted."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
readingsEmitted, err := meter.Int64Counter("signoz.meterreporter.readings.emitted", metric.WithDescription("Total number of meter readings shipped to Zeus."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
collectErrors, err := meter.Int64Counter("signoz.meterreporter.collect.errors", metric.WithDescription("Total number of collect errors across organizations and meters."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
postErrors, err := meter.Int64Counter("signoz.meterreporter.post.errors", metric.WithDescription("Total number of Zeus POST failures."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
return &reporterMetrics{
|
||||
ticks: ticks,
|
||||
readingsEmitted: readingsEmitted,
|
||||
collectErrors: collectErrors,
|
||||
postErrors: postErrors,
|
||||
}, nil
|
||||
}
|
||||
@@ -148,6 +148,24 @@ func (provider *Provider) PutMetersV2(ctx context.Context, key string, data []by
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) PutMeterReadings(ctx context.Context, key string, idempotencyKey string, data []byte) error {
|
||||
headers := http.Header{}
|
||||
if idempotencyKey != "" {
|
||||
headers.Set("X-Idempotency-Key", idempotencyKey)
|
||||
}
|
||||
|
||||
_, err := provider.doWithHeaders(
|
||||
ctx,
|
||||
provider.config.URL.JoinPath("/v2/meters"),
|
||||
http.MethodPost,
|
||||
key,
|
||||
data,
|
||||
headers,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) PutProfile(ctx context.Context, key string, profile *zeustypes.PostableProfile) error {
|
||||
body, err := json.Marshal(profile)
|
||||
if err != nil {
|
||||
@@ -183,12 +201,21 @@ func (provider *Provider) PutHost(ctx context.Context, key string, host *zeustyp
|
||||
}
|
||||
|
||||
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
|
||||
return provider.doWithHeaders(ctx, url, method, key, requestBody, nil)
|
||||
}
|
||||
|
||||
func (provider *Provider) doWithHeaders(ctx context.Context, url *url.URL, method string, key string, requestBody []byte, extraHeaders http.Header) ([]byte, error) {
|
||||
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
|
||||
request.Header.Set("Content-Type", "application/json")
|
||||
for k, vs := range extraHeaders {
|
||||
for _, v := range vs {
|
||||
request.Header.Add(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
response, err := provider.httpClient.Do(request)
|
||||
if err != nil {
|
||||
|
||||
26
pkg/meterreporter/collector.go
Normal file
26
pkg/meterreporter/collector.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Window is the reporting window a Collector produces readings for. Timestamps
|
||||
// are unix milliseconds; BucketStartMs is the emitted reading's Timestamp and
|
||||
// typically aligns to UTC day start.
|
||||
type Window struct {
|
||||
StartMs uint64
|
||||
EndMs uint64
|
||||
BucketStartMs int64 // ! See if this can be removed
|
||||
}
|
||||
|
||||
// Collector produces readings for a single Meter. Implementations are
|
||||
// stateless — the Meter carries all per-meter configuration — so a single
|
||||
// Collector instance may be shared across every entry in the registry.
|
||||
//
|
||||
// Collect must be safe to call concurrently across orgs.
|
||||
type Collector interface {
|
||||
Collect(ctx context.Context, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error)
|
||||
}
|
||||
65
pkg/meterreporter/config.go
Normal file
65
pkg/meterreporter/config.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
var _ factory.Config = (*Config)(nil)
|
||||
|
||||
type Config struct {
|
||||
// Provider selects the reporter implementation (default "noop").
|
||||
Provider string `mapstructure:"provider"`
|
||||
|
||||
// Interval is how often the reporter collects and ships meter readings.
|
||||
Interval time.Duration `mapstructure:"interval"`
|
||||
|
||||
// Timeout bounds a single collect-and-ship cycle.
|
||||
Timeout time.Duration `mapstructure:"timeout"`
|
||||
|
||||
// Retry configures exponential backoff for transient Zeus failures.
|
||||
Retry RetryConfig `mapstructure:"retry"`
|
||||
}
|
||||
|
||||
type RetryConfig struct {
|
||||
Enabled bool `mapstructure:"enabled"`
|
||||
InitialInterval time.Duration `mapstructure:"initial_interval"`
|
||||
MaxInterval time.Duration `mapstructure:"max_interval"`
|
||||
MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"`
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Provider: "noop",
|
||||
Interval: 6 * time.Hour,
|
||||
Timeout: 30 * time.Second,
|
||||
Retry: RetryConfig{
|
||||
Enabled: true,
|
||||
InitialInterval: 5 * time.Second,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: time.Minute,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("meterreporter"), newConfig)
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
if c.Interval < 30*time.Minute {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be at least 30m")
|
||||
}
|
||||
|
||||
if c.Timeout <= 0 {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be greater than 0")
|
||||
}
|
||||
|
||||
if c.Timeout >= c.Interval {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be less than meterreporter::interval")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
34
pkg/meterreporter/meter.go
Normal file
34
pkg/meterreporter/meter.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
)
|
||||
|
||||
// Meter is one registered meter — a name + how to produce readings for it. A meter is the single
|
||||
// unit of extension: to add a new meter, append one Meter to the default registry (see registry.go).
|
||||
//
|
||||
// The same metric Name may appear multiple times in the registry as long as each entry
|
||||
// uses a different SpaceAggregation (for example min/max/p99 of the same source meter).
|
||||
type Meter struct {
|
||||
// Name is the meter's identifier.
|
||||
Name meterreportertypes.Name
|
||||
|
||||
// Unit is reported verbatim as the signoz.billing.unit dimension.
|
||||
Unit string
|
||||
|
||||
// RetentionDomain indicates which product TTL should be surfaced as the signoz.billing.retention.days dimension for this meter.
|
||||
RetentionDomain RetentionDomain
|
||||
|
||||
// TimeAggregation reduces per-series samples across the query window.
|
||||
TimeAggregation metrictypes.TimeAggregation
|
||||
|
||||
// SpaceAggregation reduces across series and is reported verbatim as the signoz.billing.aggregation dimension.
|
||||
SpaceAggregation metrictypes.SpaceAggregation
|
||||
|
||||
// FilterExpression is an optional filter pushed into the query builder (e.g. "service.name = 'cart'").
|
||||
FilterExpression string
|
||||
|
||||
// Collector knows how to turn this Meter into zero or more Readings per tick.
|
||||
Collector Collector
|
||||
}
|
||||
26
pkg/meterreporter/meterreporter.go
Normal file
26
pkg/meterreporter/meterreporter.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeInvalidInput = errors.MustNewCode("meterreporter_invalid_input")
|
||||
ErrCodeReportFailed = errors.MustNewCode("meterreporter_report_failed")
|
||||
)
|
||||
|
||||
// Dimension keys automatically attached to every Reading.
|
||||
const (
|
||||
DimensionAggregation = "signoz.billing.aggregation"
|
||||
DimensionUnit = "signoz.billing.unit"
|
||||
DimensionOrganizationID = "signoz.billing.organization.id"
|
||||
DimensionRetentionDays = "signoz.billing.retention.days"
|
||||
)
|
||||
|
||||
// Reporter periodically collects meter values via the query service and ships
|
||||
// them to Zeus. Implementations must satisfy factory.ServiceWithHealthy so the
|
||||
// signoz registry can wait on startup and request graceful shutdown.
|
||||
type Reporter interface {
|
||||
factory.ServiceWithHealthy
|
||||
}
|
||||
39
pkg/meterreporter/noopmeterreporter/provider.go
Normal file
39
pkg/meterreporter/noopmeterreporter/provider.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package noopmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
healthyC chan struct{}
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
|
||||
}
|
||||
|
||||
func New(_ context.Context, _ factory.ProviderSettings, _ meterreporter.Config) (meterreporter.Reporter, error) {
|
||||
return &provider{
|
||||
healthyC: make(chan struct{}),
|
||||
stopC: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider) Start(_ context.Context) error {
|
||||
close(p.healthyC)
|
||||
<-p.stopC
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Stop(_ context.Context) error {
|
||||
close(p.stopC)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Healthy() <-chan struct{} {
|
||||
return p.healthyC
|
||||
}
|
||||
131
pkg/meterreporter/querycollector.go
Normal file
131
pkg/meterreporter/querycollector.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
var _ Collector = (*QueryCollector)(nil)
|
||||
|
||||
// QueryCollector produces a single scalar Reading per Collect call by issuing a RequestTypeScalar query
|
||||
// against the querier over SourceMeter. It reads everything it needs from the Meter it is invoked with.
|
||||
type QueryCollector struct {
|
||||
querier querier.Querier
|
||||
}
|
||||
|
||||
func NewQueryCollector(q querier.Querier) *QueryCollector {
|
||||
return &QueryCollector{querier: q}
|
||||
}
|
||||
|
||||
func (c *QueryCollector) Collect(ctx context.Context, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
req := buildQueryRequest(meter, window.StartMs, window.EndMs)
|
||||
|
||||
resp, err := c.querier.QueryRange(ctx, orgID, req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query range for meter %q", meter.Name.String())
|
||||
}
|
||||
|
||||
value, ok := extractScalarValue(resp)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: meter.Name.String(),
|
||||
Value: value,
|
||||
Timestamp: window.BucketStartMs,
|
||||
IsCompleted: false,
|
||||
Dimensions: map[string]string{
|
||||
DimensionAggregation: meter.SpaceAggregation.StringValue(),
|
||||
DimensionUnit: meter.Unit,
|
||||
},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// buildQueryRequest composes a single-query, single-aggregation scalar request over the meter source.
|
||||
// The querier applies its own step interval defaults for SourceMeter.
|
||||
func buildQueryRequest(meter Meter, startMs, endMs uint64) *querybuildertypesv5.QueryRangeRequest {
|
||||
builderQuery := querybuildertypesv5.QueryBuilderQuery[querybuildertypesv5.MetricAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Source: telemetrytypes.SourceMeter,
|
||||
Aggregations: []querybuildertypesv5.MetricAggregation{{
|
||||
MetricName: meter.Name.String(),
|
||||
TimeAggregation: meter.TimeAggregation,
|
||||
SpaceAggregation: meter.SpaceAggregation,
|
||||
}},
|
||||
}
|
||||
|
||||
if meter.FilterExpression != "" {
|
||||
builderQuery.Filter = &querybuildertypesv5.Filter{Expression: meter.FilterExpression}
|
||||
}
|
||||
|
||||
return &querybuildertypesv5.QueryRangeRequest{
|
||||
Start: startMs,
|
||||
End: endMs,
|
||||
RequestType: querybuildertypesv5.RequestTypeScalar,
|
||||
CompositeQuery: querybuildertypesv5.CompositeQuery{
|
||||
Queries: []querybuildertypesv5.QueryEnvelope{
|
||||
{
|
||||
Type: querybuildertypesv5.QueryTypeBuilder,
|
||||
Spec: builderQuery,
|
||||
},
|
||||
},
|
||||
},
|
||||
NoCache: true,
|
||||
}
|
||||
}
|
||||
|
||||
// extractScalarValue pulls the single scalar value out of a ScalarData result.
|
||||
// Returns (value, true) for a well-formed single-row/single-aggregation
|
||||
// result, (0, false) otherwise (empty, multi-row, non-scalar).
|
||||
func extractScalarValue(resp *querybuildertypesv5.QueryRangeResponse) (float64, bool) {
|
||||
if resp == nil || len(resp.Data.Results) == 0 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
scalar, ok := resp.Data.Results[0].(*querybuildertypesv5.ScalarData)
|
||||
if !ok {
|
||||
if direct, ok := resp.Data.Results[0].(querybuildertypesv5.ScalarData); ok {
|
||||
scalar = &direct
|
||||
} else {
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
if len(scalar.Data) == 0 || len(scalar.Data[0]) == 0 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
for colIdx, col := range scalar.Columns {
|
||||
if col == nil {
|
||||
continue
|
||||
}
|
||||
if col.Type == querybuildertypesv5.ColumnTypeAggregation {
|
||||
if colIdx >= len(scalar.Data[0]) {
|
||||
return 0, false
|
||||
}
|
||||
switch v := scalar.Data[0][colIdx].(type) {
|
||||
case float64:
|
||||
return v, true
|
||||
case float32:
|
||||
return float64(v), true
|
||||
case int:
|
||||
return float64(v), true
|
||||
case int64:
|
||||
return float64(v), true
|
||||
case uint64:
|
||||
return float64(v), true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
99
pkg/meterreporter/registry.go
Normal file
99
pkg/meterreporter/registry.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
)
|
||||
|
||||
// Exported names for every meter the reporter knows about. Refer to these
|
||||
// symbols (not string literals) everywhere - typos turn into compile errors
|
||||
// instead of silently producing a new meter row at Zeus.
|
||||
var (
|
||||
MeterLogCount = meterreportertypes.MustNewName("signoz.meter.log.count")
|
||||
MeterLogSize = meterreportertypes.MustNewName("signoz.meter.log.size")
|
||||
)
|
||||
|
||||
func baseMeters(q querier.Querier, sqlstore sqlstore.SQLStore) []*Meter {
|
||||
queryCollector := NewQueryCollector(q)
|
||||
retentionAwareQueryCollector := NewRetentionDimensionsCollector(queryCollector, NewSQLRetentionResolver(sqlstore))
|
||||
|
||||
meters := []*Meter{
|
||||
{
|
||||
Name: MeterLogCount,
|
||||
Unit: "count",
|
||||
RetentionDomain: RetentionDomainLogs,
|
||||
TimeAggregation: metrictypes.TimeAggregationSum,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
Collector: retentionAwareQueryCollector,
|
||||
},
|
||||
{
|
||||
Name: MeterLogSize,
|
||||
Unit: "bytes",
|
||||
RetentionDomain: RetentionDomainLogs,
|
||||
TimeAggregation: metrictypes.TimeAggregationSum,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
Collector: retentionAwareQueryCollector,
|
||||
},
|
||||
}
|
||||
|
||||
mustValidateMeters(meters...)
|
||||
return meters
|
||||
}
|
||||
|
||||
// DefaultMeters returns the hardcoded query-backed meters supported by the reporter.
|
||||
func DefaultMeters(q querier.Querier, sqlstore sqlstore.SQLStore) ([]Meter, error) {
|
||||
meters := baseMeters(q, sqlstore)
|
||||
if err := validateMeters(meters...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resolved := make([]Meter, 0, len(meters))
|
||||
for _, meter := range meters {
|
||||
resolved = append(resolved, *meter)
|
||||
}
|
||||
|
||||
return resolved, nil
|
||||
}
|
||||
|
||||
// validateMeters checks that the runtime meter list is internally consistent.
|
||||
// Every meter must:
|
||||
// - have a non-zero Name,
|
||||
// - have a non-empty Unit,
|
||||
// - have a non-nil Collector,
|
||||
// - use a unique (Name, SpaceAggregation) pair.
|
||||
func validateMeters(meters ...*Meter) error {
|
||||
seen := make(map[string]struct{}, len(meters))
|
||||
|
||||
for _, meter := range meters {
|
||||
if meter == nil {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "nil meter in registry")
|
||||
}
|
||||
if meter.Name.IsZero() {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter with empty name in registry")
|
||||
}
|
||||
if meter.Unit == "" {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no unit", meter.Name.String())
|
||||
}
|
||||
if meter.Collector == nil {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no collector", meter.Name.String())
|
||||
}
|
||||
|
||||
key := meter.Name.String() + "|" + meter.SpaceAggregation.StringValue()
|
||||
if _, ok := seen[key]; ok {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "duplicate meter %q with aggregation %q", meter.Name.String(), meter.SpaceAggregation.StringValue())
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// mustValidateMeters panics when hardcoded meter declarations are invalid.
|
||||
func mustValidateMeters(meters ...*Meter) {
|
||||
if err := validateMeters(meters...); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
124
pkg/meterreporter/retention.go
Normal file
124
pkg/meterreporter/retention.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strconv"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type RetentionDomain string
|
||||
|
||||
const (
|
||||
RetentionDomainNone RetentionDomain = ""
|
||||
RetentionDomainLogs RetentionDomain = "logs"
|
||||
RetentionDomainMetrics RetentionDomain = "metrics"
|
||||
RetentionDomainTraces RetentionDomain = "traces"
|
||||
)
|
||||
|
||||
type retentionResolver interface {
|
||||
ResolveDays(ctx context.Context, orgID valuer.UUID, domain RetentionDomain) (string, bool, error)
|
||||
}
|
||||
|
||||
type retentionDimensionsCollector struct {
|
||||
inner Collector
|
||||
resolver retentionResolver
|
||||
}
|
||||
|
||||
func NewRetentionDimensionsCollector(inner Collector, resolver retentionResolver) Collector {
|
||||
if inner == nil || resolver == nil {
|
||||
return inner
|
||||
}
|
||||
|
||||
return &retentionDimensionsCollector{
|
||||
inner: inner,
|
||||
resolver: resolver,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *retentionDimensionsCollector) Collect(ctx context.Context, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
readings, err := c.inner.Collect(ctx, meter, orgID, window)
|
||||
if err != nil || len(readings) == 0 || meter.RetentionDomain == RetentionDomainNone {
|
||||
return readings, err
|
||||
}
|
||||
|
||||
retentionDays, ok, err := c.resolver.ResolveDays(ctx, orgID, meter.RetentionDomain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return readings, nil
|
||||
}
|
||||
|
||||
for i := range readings {
|
||||
if readings[i].Dimensions == nil {
|
||||
readings[i].Dimensions = make(map[string]string, 1)
|
||||
}
|
||||
readings[i].Dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return readings, nil
|
||||
}
|
||||
|
||||
type sqlRetentionResolver struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewSQLRetentionResolver(sqlstore sqlstore.SQLStore) retentionResolver {
|
||||
if sqlstore == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &sqlRetentionResolver{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (r *sqlRetentionResolver) ResolveDays(ctx context.Context, orgID valuer.UUID, domain RetentionDomain) (string, bool, error) {
|
||||
tableName, ok := retentionTableName(domain)
|
||||
if !ok {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
ttl := new(types.TTLSetting)
|
||||
err := r.sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(ttl).
|
||||
Where("table_name = ?", tableName).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
OrderExpr("created_at DESC").
|
||||
Limit(1).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "", false, nil
|
||||
}
|
||||
return "", false, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "load retention for domain %q", domain)
|
||||
}
|
||||
|
||||
if ttl.TTL <= 0 {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
return strconv.Itoa(ttl.TTL / (24 * 3600)), true, nil
|
||||
}
|
||||
|
||||
func retentionTableName(domain RetentionDomain) (string, bool) {
|
||||
switch domain {
|
||||
case RetentionDomainLogs:
|
||||
return telemetrylogs.DBName + "." + telemetrylogs.LogsV2TableName, true
|
||||
case RetentionDomainMetrics:
|
||||
return telemetrymetrics.DBName + "." + telemetrymetrics.SamplesV4TableName, true
|
||||
case RetentionDomainTraces:
|
||||
return telemetrytraces.DBName + "." + telemetrytraces.SpanIndexV3TableName, true
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/identn"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
|
||||
@@ -129,6 +130,9 @@ type Config struct {
|
||||
// Auditor config
|
||||
Auditor auditor.Config `mapstructure:"auditor"`
|
||||
|
||||
// MeterReporter config
|
||||
MeterReporter meterreporter.Config `mapstructure:"meterreporter"`
|
||||
|
||||
// CloudIntegration config
|
||||
CloudIntegration cloudintegration.Config `mapstructure:"cloudintegration"`
|
||||
}
|
||||
@@ -162,6 +166,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
|
||||
identn.NewConfigFactory(),
|
||||
serviceaccount.NewConfigFactory(),
|
||||
auditor.NewConfigFactory(),
|
||||
meterreporter.NewConfigFactory(),
|
||||
cloudintegration.NewConfigFactory(),
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/identn/apikeyidentn"
|
||||
"github.com/SigNoz/signoz/pkg/identn/impersonationidentn"
|
||||
"github.com/SigNoz/signoz/pkg/identn/tokenizeridentn"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter/noopmeterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
@@ -315,6 +317,12 @@ func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[audi
|
||||
)
|
||||
}
|
||||
|
||||
func NewMeterReporterProviderFactories() factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
noopmeterreporter.NewFactory(),
|
||||
)
|
||||
}
|
||||
|
||||
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
configflagger.NewFactory(registry),
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/identn"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -84,6 +85,7 @@ type SigNoz struct {
|
||||
Flagger flagger.Flagger
|
||||
Gateway gateway.Gateway
|
||||
Auditor auditor.Auditor
|
||||
MeterReporter meterreporter.Reporter
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -104,6 +106,7 @@ func New(
|
||||
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
|
||||
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
|
||||
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
|
||||
meterReporterProviderFactories func(licensing.Licensing, querier.Querier, sqlstore.SQLStore, organization.Getter, zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]],
|
||||
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
|
||||
cloudIntegrationCallback func(sqlstore.SQLStore, global.Global, zeus.Zeus, gateway.Gateway, licensing.Licensing, serviceaccount.Module, cloudintegration.Config) (cloudintegration.Module, error),
|
||||
rulerProviderFactories func(cache.Cache, alertmanager.Alertmanager, sqlstore.SQLStore, telemetrystore.TelemetryStore, telemetrytypes.MetadataStore, prometheus.Prometheus, organization.Getter, rulestatehistory.Module, querier.Querier, queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]],
|
||||
@@ -377,6 +380,12 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize meter reporter from the variant-specific provider factories
|
||||
meterReporter, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.MeterReporter, meterReporterProviderFactories(licensing, querier, sqlstore, orgGetter, zeus), config.MeterReporter.Provider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize authns
|
||||
store := sqlauthnstore.NewStore(sqlstore)
|
||||
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
|
||||
@@ -491,6 +500,7 @@ func New(
|
||||
factory.NewNamedService(factory.MustNewName("authz"), authz),
|
||||
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
|
||||
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
|
||||
factory.NewNamedService(factory.MustNewName("meterreporter"), meterReporter, factory.MustNewName("licensing")),
|
||||
factory.NewNamedService(factory.MustNewName("ruler"), rulerInstance),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -540,5 +550,6 @@ func New(
|
||||
Flagger: flagger,
|
||||
Gateway: gateway,
|
||||
Auditor: auditor,
|
||||
MeterReporter: meterReporter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
41
pkg/types/meterreportertypes/name.go
Normal file
41
pkg/types/meterreportertypes/name.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package meterreportertypes
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
var nameRegex = regexp.MustCompile(`^[a-z][a-z0-9_.]+$`)
|
||||
|
||||
// Name is a concrete type for a meter name. Dotted namespace identifiers like
|
||||
// "signoz.meter.log.count" are permitted; arbitrary strings are not, to avoid
|
||||
// typos silently producing distinct meter rows at Zeus.
|
||||
type Name struct {
|
||||
s string
|
||||
}
|
||||
|
||||
func NewName(s string) (Name, error) {
|
||||
if !nameRegex.MatchString(s) {
|
||||
return Name{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid meter name: %s", s)
|
||||
}
|
||||
|
||||
return Name{s: s}, nil
|
||||
}
|
||||
|
||||
func MustNewName(s string) Name {
|
||||
name, err := NewName(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return name
|
||||
}
|
||||
|
||||
func (n Name) String() string {
|
||||
return n.s
|
||||
}
|
||||
|
||||
func (n Name) IsZero() bool {
|
||||
return n.s == ""
|
||||
}
|
||||
32
pkg/types/meterreportertypes/types.go
Normal file
32
pkg/types/meterreportertypes/types.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package meterreportertypes
|
||||
|
||||
// Reading is a single meter value sent to Zeus. Zeus UPSERTs on
|
||||
// (license_key, dimension_hash, timestamp), so repeated readings within the
|
||||
// same tick window safely overwrite prior values.
|
||||
type Reading struct {
|
||||
// MeterName is the fully-qualified meter identifier.
|
||||
MeterName string `json:"meterName"`
|
||||
|
||||
// Value is the aggregated scalar for this (meter, aggregation) pair over the reporting window.
|
||||
Value float64 `json:"value"`
|
||||
|
||||
// Timestamp is the window-start in epoch milliseconds (UTC day start).
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
|
||||
// IsCompleted is true only for sealed past buckets. In-progress buckets
|
||||
// (e.g. the current UTC day) report IsCompleted=false so Zeus knows the value may still change.
|
||||
IsCompleted bool `json:"isCompleted"`
|
||||
|
||||
// Dimensions is the per-reading label set.
|
||||
Dimensions map[string]string `json:"dimensions"`
|
||||
}
|
||||
|
||||
// PostableMeterReadings is the request body for Zeus.PutMeterReadings.
|
||||
type PostableMeterReadings struct { // ! Needs fix once zeus contract is setup
|
||||
// IdempotencyKey is echoed as the X-Idempotency-Key header and stored by
|
||||
// Zeus so retries within the same tick window overwrite rather than duplicate.
|
||||
IdempotencyKey string `json:"idempotencyKey"`
|
||||
|
||||
// Readings is the batch of meter values being shipped.
|
||||
Readings []Reading `json:"readings"`
|
||||
}
|
||||
@@ -49,6 +49,10 @@ func (provider *provider) PutMetersV2(_ context.Context, _ string, _ []byte) err
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v2 is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) PutMeterReadings(_ context.Context, _ string, _ string, _ []byte) error {
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meter readings is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) PutProfile(_ context.Context, _ string, _ *zeustypes.PostableProfile) error {
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
|
||||
}
|
||||
|
||||
@@ -35,6 +35,11 @@ type Zeus interface {
|
||||
// Puts the meters for the given license key using Zeus.
|
||||
PutMetersV2(context.Context, string, []byte) error
|
||||
|
||||
// PutMeterReadings ships TDD-shape meter readings to the v2/meters
|
||||
// endpoint. idempotencyKey is propagated as X-Idempotency-Key so Zeus can
|
||||
// UPSERT on retries.
|
||||
PutMeterReadings(ctx context.Context, licenseKey string, idempotencyKey string, body []byte) error
|
||||
|
||||
// Put profile for the given license key.
|
||||
PutProfile(context.Context, string, *zeustypes.PostableProfile) error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user