mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-21 19:30:29 +01:00
Compare commits
4 Commits
base-path-
...
feat/billi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5dec8e26ad | ||
|
|
e102cc07ce | ||
|
|
b898fb6e3b | ||
|
|
bc7dbab2b2 |
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
@@ -109,6 +110,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
|
||||
return signoz.NewAuditorProviderFactories()
|
||||
},
|
||||
func(_ licensing.Licensing, _ telemetrystore.TelemetryStore, _ sqlstore.SQLStore, _ organization.Getter, _ zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
return signoz.NewMeterReporterProviderFactories()
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
return querier.NewHandler(ps, q, a)
|
||||
},
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/SigNoz/signoz/ee/gateway/httpgateway"
|
||||
enterpriselicensing "github.com/SigNoz/signoz/ee/licensing"
|
||||
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
|
||||
"github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter"
|
||||
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration/implcloudprovider"
|
||||
"github.com/SigNoz/signoz/ee/modules/dashboard/impldashboard"
|
||||
@@ -38,6 +39,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/gateway"
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
pkgcloudintegration "github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
@@ -157,6 +159,13 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(licensing licensing.Licensing, telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore, orgGetter organization.Getter, zeus zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
factories := signoz.NewMeterReporterProviderFactories()
|
||||
if err := factories.Add(signozmeterreporter.NewFactory(licensing, telemetryStore, sqlStore, orgGetter, zeus)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
communityHandler := querier.NewHandler(ps, q, a)
|
||||
return eequerier.NewHandler(ps, q, communityHandler)
|
||||
|
||||
@@ -407,3 +407,23 @@ cloudintegration:
|
||||
agent:
|
||||
# The version of the cloud integration agent.
|
||||
version: v0.0.8
|
||||
|
||||
##################### Meter Reporter #####################
|
||||
meterreporter:
|
||||
# Specifies the meter reporter provider to use.
|
||||
# noop: does not report any meters (community default).
|
||||
# signoz: periodically queries meters via the querier and ships readings to Zeus (enterprise).
|
||||
provider: noop
|
||||
# The interval between collection ticks. Minimum 30m.
|
||||
interval: 6h
|
||||
# The per-tick timeout that bounds collect-and-ship work.
|
||||
timeout: 30s
|
||||
retry:
|
||||
# Whether to retry on transient failures.
|
||||
enabled: true
|
||||
# The initial wait time before the first retry.
|
||||
initial_interval: 5s
|
||||
# The upper bound on backoff interval.
|
||||
max_interval: 30s
|
||||
# The total maximum time spent retrying.
|
||||
max_elapsed_time: 1m
|
||||
|
||||
125
ee/meterreporter/signozmeterreporter/collect.go
Normal file
125
ee/meterreporter/signozmeterreporter/collect.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// tick collects one round of readings for the instance's org and ships them to
|
||||
// zeus under its active license. Per-collector errors are logged and counted
|
||||
// but do not abort the tick.
|
||||
func (provider *Provider) tick(ctx context.Context) error {
|
||||
now := time.Now().UTC()
|
||||
|
||||
// Go to 00:00 UTC of current day (in milliseconds)
|
||||
bucketStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
||||
|
||||
// Period in which meter data will be queried: 00:00 UTC → now UTC
|
||||
window := meterreporter.Window{
|
||||
StartMs: bucketStart.UnixMilli(),
|
||||
EndMs: now.UnixMilli(),
|
||||
}
|
||||
|
||||
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "failed to list organizations")
|
||||
}
|
||||
if len(orgs) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(orgs) > 1 {
|
||||
// Billing is scoped to a single license per instance; the meter data in signoz_meter has no org marker,
|
||||
// so we can't split a multi-org instance correctly. Report against the first org and warn.
|
||||
provider.settings.Logger().WarnContext(ctx, "multiple orgs on a single instance; reporting only the first", slog.Int("org_count", len(orgs)))
|
||||
}
|
||||
org := orgs[0]
|
||||
|
||||
license, err := provider.licensing.GetActive(ctx, org.ID)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "failed to fetch active license for org %q", org.ID.StringValue())
|
||||
}
|
||||
if license == nil || license.Key == "" {
|
||||
provider.settings.Logger().WarnContext(ctx, "skipping tick, nil/empty license for org", slog.String("org_id", org.ID.StringValue()))
|
||||
return nil
|
||||
}
|
||||
|
||||
readings := provider.collectOrgReadings(ctx, org.ID, window)
|
||||
if len(readings) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
date := bucketStart.Format("2006-01-02")
|
||||
if err := provider.shipReadings(ctx, license.Key, date, readings); err != nil {
|
||||
provider.metrics.postErrors.Add(ctx, 1)
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to ship meter readings", errors.Attr(err), slog.Int("readings", len(readings)))
|
||||
return nil
|
||||
}
|
||||
provider.metrics.readingsEmitted.Add(ctx, int64(len(readings)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// collectOrgReadings runs every registered Meter's Collector against orgID and
|
||||
// returns their combined Readings. Individual meter failures are logged and
|
||||
// skipped — one bad meter does not block the rest of the batch.
|
||||
func (provider *Provider) collectOrgReadings(ctx context.Context, orgID valuer.UUID, window meterreporter.Window) []meterreportertypes.Reading {
|
||||
readings := make([]meterreportertypes.Reading, 0, len(provider.meters))
|
||||
|
||||
for _, meter := range provider.meters {
|
||||
collectedReadings, err := meter.Collect(ctx, provider.deps, meter, orgID, window)
|
||||
if err != nil {
|
||||
provider.metrics.collectErrors.Add(ctx, 1)
|
||||
provider.settings.Logger().WarnContext(ctx, "meter collection failed", errors.Attr(err), slog.String("meter", meter.Name.String()), slog.String("org_id", orgID.StringValue()))
|
||||
continue
|
||||
}
|
||||
|
||||
readings = append(readings, collectedReadings...)
|
||||
}
|
||||
|
||||
return readings
|
||||
}
|
||||
|
||||
// shipReadings encodes the batch as PostableMeterReadings JSON and, in the
|
||||
// fully wired flow, POSTs it to Zeus under a date-scoped idempotency key so
|
||||
// subsequent ticks within the same UTC day UPSERT.
|
||||
//
|
||||
// ! TEMPORARY: the Zeus PutMeterReadings endpoint is not live yet. Until it
|
||||
// lands, we log the payload at INFO so staging can verify collection end-to-end
|
||||
// without a server counterpart. Restore the Zeus call (and drop the log) once
|
||||
// the API ships.
|
||||
func (provider *Provider) shipReadings(ctx context.Context, licenseKey string, date string, readings []meterreportertypes.Reading) error {
|
||||
idempotencyKey := fmt.Sprintf("meter-cron:%s", date)
|
||||
|
||||
// ! TODO: this needs to be fixed in the format we make the zeus API
|
||||
payload := meterreportertypes.PostableMeterReadings{
|
||||
IdempotencyKey: idempotencyKey,
|
||||
Readings: readings,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "marshal meter readings")
|
||||
}
|
||||
|
||||
// ! TEMPORARY: skip the Zeus call until the API is available. Logging the
|
||||
// serialized payload instead so we can eyeball readings in staging logs.
|
||||
// When Zeus is ready, replace the log below with:
|
||||
// if err := provider.zeus.PutMeterReadings(ctx, licenseKey, idempotencyKey, body); err != nil { ... }
|
||||
provider.settings.Logger().InfoContext(ctx, "meter readings (Zeus API not yet live — dry-run log)",
|
||||
slog.String("license_key", licenseKey),
|
||||
slog.String("idempotency_key", idempotencyKey),
|
||||
slog.Int("readings", len(readings)),
|
||||
slog.String("payload", string(body)),
|
||||
)
|
||||
_ = provider.zeus // keep the field referenced so the dep wiring does not bitrot
|
||||
|
||||
return nil
|
||||
}
|
||||
154
ee/meterreporter/signozmeterreporter/provider.go
Normal file
154
ee/meterreporter/signozmeterreporter/provider.go
Normal file
@@ -0,0 +1,154 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
)
|
||||
|
||||
var _ factory.ServiceWithHealthy = (*Provider)(nil)
|
||||
|
||||
// Provider is the enterprise meter reporter. It ticks on a fixed interval,
|
||||
// invokes every registered Collector against every licensed org, and ships
|
||||
// the resulting readings to Zeus.
|
||||
type Provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
config meterreporter.Config
|
||||
meters []meterreporter.Meter
|
||||
deps meterreporter.CollectorDeps
|
||||
|
||||
licensing licensing.Licensing
|
||||
orgGetter organization.Getter
|
||||
zeus zeus.Zeus
|
||||
|
||||
healthyC chan struct{}
|
||||
stopC chan struct{}
|
||||
goroutinesWg sync.WaitGroup
|
||||
metrics *reporterMetrics
|
||||
}
|
||||
|
||||
// NewFactory returns a ProviderFactory for the signoz meter reporter.
|
||||
func NewFactory(
|
||||
licensing licensing.Licensing,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
zeus zeus.Zeus,
|
||||
) factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName("signoz"),
|
||||
func(ctx context.Context, providerSettings factory.ProviderSettings, config meterreporter.Config) (meterreporter.Reporter, error) {
|
||||
return newProvider(ctx, providerSettings, config, licensing, telemetryStore, sqlstore, orgGetter, zeus)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func newProvider(
|
||||
_ context.Context,
|
||||
providerSettings factory.ProviderSettings,
|
||||
config meterreporter.Config,
|
||||
licensing licensing.Licensing,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
zeus zeus.Zeus,
|
||||
) (*Provider, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter")
|
||||
|
||||
metrics, err := newReporterMetrics(settings.Meter())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meters, err := meterreporter.DefaultMeters()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Provider{
|
||||
settings: settings,
|
||||
config: config,
|
||||
meters: meters,
|
||||
deps: meterreporter.CollectorDeps{
|
||||
TelemetryStore: telemetryStore,
|
||||
SQLStore: sqlstore,
|
||||
},
|
||||
licensing: licensing,
|
||||
orgGetter: orgGetter,
|
||||
zeus: zeus,
|
||||
healthyC: make(chan struct{}),
|
||||
stopC: make(chan struct{}),
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start runs an initial tick and then loops on the configured interval until
|
||||
// Stop is called. Start blocks until the goroutine returns, matching the
|
||||
// factory.Service contract used across the codebase.
|
||||
func (provider *Provider) Start(ctx context.Context) error {
|
||||
close(provider.healthyC)
|
||||
|
||||
provider.goroutinesWg.Add(1)
|
||||
go func() {
|
||||
defer provider.goroutinesWg.Done()
|
||||
|
||||
provider.runTick(ctx)
|
||||
|
||||
ticker := time.NewTicker(provider.config.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-provider.stopC:
|
||||
return
|
||||
case <-ticker.C:
|
||||
provider.runTick(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
provider.goroutinesWg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop requests the reporter to stop, waits for the in-flight tick (bounded by
|
||||
// Config.Timeout) to complete, and returns.
|
||||
func (provider *Provider) Stop(_ context.Context) error {
|
||||
<-provider.healthyC
|
||||
select {
|
||||
case <-provider.stopC:
|
||||
// already closed
|
||||
default:
|
||||
close(provider.stopC)
|
||||
}
|
||||
provider.goroutinesWg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) Healthy() <-chan struct{} {
|
||||
return provider.healthyC
|
||||
}
|
||||
|
||||
// runTick executes one collect-and-ship cycle under Config.Timeout. Errors are
|
||||
// logged and counted; they do not propagate because the reporter must keep
|
||||
// ticking on subsequent intervals.
|
||||
func (provider *Provider) runTick(parentCtx context.Context) {
|
||||
provider.metrics.ticks.Add(parentCtx, 1)
|
||||
|
||||
ctx, cancel := context.WithTimeout(parentCtx, provider.config.Timeout)
|
||||
defer cancel()
|
||||
|
||||
if err := provider.tick(ctx); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "meter reporter tick failed", errors.Attr(err), slog.Duration("timeout", provider.config.Timeout))
|
||||
}
|
||||
}
|
||||
48
ee/meterreporter/signozmeterreporter/telemetry.go
Normal file
48
ee/meterreporter/signozmeterreporter/telemetry.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
type reporterMetrics struct {
|
||||
ticks metric.Int64Counter
|
||||
readingsEmitted metric.Int64Counter
|
||||
collectErrors metric.Int64Counter
|
||||
postErrors metric.Int64Counter
|
||||
}
|
||||
|
||||
func newReporterMetrics(meter metric.Meter) (*reporterMetrics, error) {
|
||||
var errs error
|
||||
|
||||
ticks, err := meter.Int64Counter("signoz.meterreporter.ticks", metric.WithDescription("Total number of meter reporter ticks that ran to completion or aborted."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
readingsEmitted, err := meter.Int64Counter("signoz.meterreporter.readings.emitted", metric.WithDescription("Total number of meter readings shipped to Zeus."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
collectErrors, err := meter.Int64Counter("signoz.meterreporter.collect.errors", metric.WithDescription("Total number of collect errors across organizations and meters."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
postErrors, err := meter.Int64Counter("signoz.meterreporter.post.errors", metric.WithDescription("Total number of Zeus POST failures."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
return &reporterMetrics{
|
||||
ticks: ticks,
|
||||
readingsEmitted: readingsEmitted,
|
||||
collectErrors: collectErrors,
|
||||
postErrors: postErrors,
|
||||
}, nil
|
||||
}
|
||||
@@ -148,6 +148,24 @@ func (provider *Provider) PutMetersV2(ctx context.Context, key string, data []by
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) PutMeterReadings(ctx context.Context, key string, idempotencyKey string, data []byte) error {
|
||||
headers := http.Header{}
|
||||
if idempotencyKey != "" {
|
||||
headers.Set("X-Idempotency-Key", idempotencyKey)
|
||||
}
|
||||
|
||||
_, err := provider.doWithHeaders(
|
||||
ctx,
|
||||
provider.config.URL.JoinPath("/v2/meters"),
|
||||
http.MethodPost,
|
||||
key,
|
||||
data,
|
||||
headers,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) PutProfile(ctx context.Context, key string, profile *zeustypes.PostableProfile) error {
|
||||
body, err := json.Marshal(profile)
|
||||
if err != nil {
|
||||
@@ -183,12 +201,21 @@ func (provider *Provider) PutHost(ctx context.Context, key string, host *zeustyp
|
||||
}
|
||||
|
||||
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
|
||||
return provider.doWithHeaders(ctx, url, method, key, requestBody, nil)
|
||||
}
|
||||
|
||||
func (provider *Provider) doWithHeaders(ctx context.Context, url *url.URL, method string, key string, requestBody []byte, extraHeaders http.Header) ([]byte, error) {
|
||||
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
|
||||
request.Header.Set("Content-Type", "application/json")
|
||||
for k, vs := range extraHeaders {
|
||||
for _, v := range vs {
|
||||
request.Header.Add(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
response, err := provider.httpClient.Do(request)
|
||||
if err != nil {
|
||||
|
||||
28
pkg/meterreporter/collector.go
Normal file
28
pkg/meterreporter/collector.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Window is the reporting window a Collector produces readings for. Timestamps
|
||||
// are unix milliseconds; StartMs is used as the emitted reading's Timestamp
|
||||
// and typically aligns to UTC day start.
|
||||
type Window struct {
|
||||
StartMs int64
|
||||
EndMs int64
|
||||
}
|
||||
|
||||
// CollectorDeps contains the dependencies a meter collector may need to
|
||||
// resolve readings. Individual collectors can choose the subset they use.
|
||||
type CollectorDeps struct {
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
SQLStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
// CollectorFunc resolves readings for a single Meter.
|
||||
type CollectorFunc func(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error)
|
||||
103
pkg/meterreporter/collector_log.go
Normal file
103
pkg/meterreporter/collector_log.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
// CollectLogCountMeter emits a single Reading for signoz.meter.log.count.
|
||||
// Each log-meter collector owns its own query end-to-end — duplication is
|
||||
// preferred over shared helpers because these paths are billing-critical.
|
||||
func CollectLogCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterLogCount.String()),
|
||||
sb.GTE("unix_milli", window.StartMs),
|
||||
sb.LT("unix_milli", window.EndMs),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterLogCount.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainLogs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterLogCount.String(),
|
||||
Value: value,
|
||||
Timestamp: window.StartMs,
|
||||
IsCompleted: false,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// CollectLogSizeMeter emits a single Reading for signoz.meter.log.size.
|
||||
// Each log-meter collector owns its own query end-to-end — duplication is
|
||||
// preferred over shared helpers because these paths are billing-critical.
|
||||
func CollectLogSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterLogSize.String()),
|
||||
sb.GTE("unix_milli", window.StartMs),
|
||||
sb.LT("unix_milli", window.EndMs),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterLogSize.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainLogs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterLogSize.String(),
|
||||
Value: value,
|
||||
Timestamp: window.StartMs,
|
||||
IsCompleted: false,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
105
pkg/meterreporter/collector_metric.go
Normal file
105
pkg/meterreporter/collector_metric.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
// CollectMetricDatapointCountMeter emits a single Reading for
|
||||
// signoz.meter.metric.datapoint.count. Each metric-meter collector owns its
|
||||
// own query end-to-end — duplication is preferred over shared helpers because
|
||||
// these paths are billing-critical.
|
||||
func CollectMetricDatapointCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterMetricDatapointCount.String()),
|
||||
sb.GTE("unix_milli", window.StartMs),
|
||||
sb.LT("unix_milli", window.EndMs),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterMetricDatapointCount.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterMetricDatapointCount.String(),
|
||||
Value: value,
|
||||
Timestamp: window.StartMs,
|
||||
IsCompleted: false,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// CollectMetricDatapointSizeMeter emits a single Reading for
|
||||
// signoz.meter.metric.datapoint.size. Each metric-meter collector owns its
|
||||
// own query end-to-end — duplication is preferred over shared helpers because
|
||||
// these paths are billing-critical.
|
||||
func CollectMetricDatapointSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterMetricDatapointSize.String()),
|
||||
sb.GTE("unix_milli", window.StartMs),
|
||||
sb.LT("unix_milli", window.EndMs),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterMetricDatapointSize.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterMetricDatapointSize.String(),
|
||||
Value: value,
|
||||
Timestamp: window.StartMs,
|
||||
IsCompleted: false,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
103
pkg/meterreporter/collector_trace.go
Normal file
103
pkg/meterreporter/collector_trace.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
// CollectSpanCountMeter emits a single Reading for signoz.meter.span.count.
|
||||
// Each trace-meter collector owns its own query end-to-end — duplication is
|
||||
// preferred over shared helpers because these paths are billing-critical.
|
||||
func CollectSpanCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterSpanCount.String()),
|
||||
sb.GTE("unix_milli", window.StartMs),
|
||||
sb.LT("unix_milli", window.EndMs),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterSpanCount.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainTraces)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterSpanCount.String(),
|
||||
Value: value,
|
||||
Timestamp: window.StartMs,
|
||||
IsCompleted: false,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// CollectSpanSizeMeter emits a single Reading for signoz.meter.span.size.
|
||||
// Each trace-meter collector owns its own query end-to-end — duplication is
|
||||
// preferred over shared helpers because these paths are billing-critical.
|
||||
func CollectSpanSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterSpanSize.String()),
|
||||
sb.GTE("unix_milli", window.StartMs),
|
||||
sb.LT("unix_milli", window.EndMs),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterSpanSize.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainTraces)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterSpanSize.String(),
|
||||
Value: value,
|
||||
Timestamp: window.StartMs,
|
||||
IsCompleted: false,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
65
pkg/meterreporter/config.go
Normal file
65
pkg/meterreporter/config.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
var _ factory.Config = (*Config)(nil)
|
||||
|
||||
type Config struct {
|
||||
// Provider selects the reporter implementation (default "noop").
|
||||
Provider string `mapstructure:"provider"`
|
||||
|
||||
// Interval is how often the reporter collects and ships meter readings.
|
||||
Interval time.Duration `mapstructure:"interval"`
|
||||
|
||||
// Timeout bounds a single collect-and-ship cycle.
|
||||
Timeout time.Duration `mapstructure:"timeout"`
|
||||
|
||||
// Retry configures exponential backoff for transient Zeus failures.
|
||||
Retry RetryConfig `mapstructure:"retry"`
|
||||
}
|
||||
|
||||
type RetryConfig struct {
|
||||
Enabled bool `mapstructure:"enabled"`
|
||||
InitialInterval time.Duration `mapstructure:"initial_interval"`
|
||||
MaxInterval time.Duration `mapstructure:"max_interval"`
|
||||
MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"`
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Provider: "noop",
|
||||
Interval: 6 * time.Hour,
|
||||
Timeout: 30 * time.Second,
|
||||
Retry: RetryConfig{
|
||||
Enabled: true,
|
||||
InitialInterval: 5 * time.Second,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: time.Minute,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("meterreporter"), newConfig)
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
if c.Interval < 5*time.Minute {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be at least 5m")
|
||||
}
|
||||
|
||||
if c.Timeout <= 0 {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be greater than 0")
|
||||
}
|
||||
|
||||
if c.Timeout >= c.Interval {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be less than meterreporter::interval")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
23
pkg/meterreporter/meter.go
Normal file
23
pkg/meterreporter/meter.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
)
|
||||
|
||||
// Meter is one registered meter - a name, billing metadata, and the function that knows how to produce readings for it.
|
||||
//
|
||||
// The same metric Name may appear multiple times in the registry as long as each entry
|
||||
// uses a different Aggregation (for example min/max/p99 of the same source meter).
|
||||
type Meter struct {
|
||||
// Name is the meter's identifier.
|
||||
Name meterreportertypes.Name
|
||||
|
||||
// Unit is available to the collector for the signoz.billing.unit dimension.
|
||||
Unit string
|
||||
|
||||
// Aggregation is available to the collector for the signoz.billing.aggregation dimension.
|
||||
Aggregation string
|
||||
|
||||
// Collect knows how to turn this Meter into zero or more Readings per tick.
|
||||
Collect CollectorFunc
|
||||
}
|
||||
26
pkg/meterreporter/meterreporter.go
Normal file
26
pkg/meterreporter/meterreporter.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeInvalidInput = errors.MustNewCode("meterreporter_invalid_input")
|
||||
ErrCodeReportFailed = errors.MustNewCode("meterreporter_report_failed")
|
||||
)
|
||||
|
||||
// Dimension keys automatically attached to every Reading.
|
||||
const (
|
||||
DimensionAggregation = "signoz.billing.aggregation"
|
||||
DimensionUnit = "signoz.billing.unit"
|
||||
DimensionOrganizationID = "signoz.billing.organization.id"
|
||||
DimensionRetentionDays = "signoz.billing.retention.days"
|
||||
)
|
||||
|
||||
// Reporter periodically collects meter values via the query service and ships
|
||||
// them to Zeus. Implementations must satisfy factory.ServiceWithHealthy so the
|
||||
// signoz registry can wait on startup and request graceful shutdown.
|
||||
type Reporter interface {
|
||||
factory.ServiceWithHealthy
|
||||
}
|
||||
39
pkg/meterreporter/noopmeterreporter/provider.go
Normal file
39
pkg/meterreporter/noopmeterreporter/provider.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package noopmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
healthyC chan struct{}
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
|
||||
}
|
||||
|
||||
func New(_ context.Context, _ factory.ProviderSettings, _ meterreporter.Config) (meterreporter.Reporter, error) {
|
||||
return &provider{
|
||||
healthyC: make(chan struct{}),
|
||||
stopC: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider) Start(_ context.Context) error {
|
||||
close(p.healthyC)
|
||||
<-p.stopC
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Stop(_ context.Context) error {
|
||||
close(p.stopC)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Healthy() <-chan struct{} {
|
||||
return p.healthyC
|
||||
}
|
||||
123
pkg/meterreporter/registry.go
Normal file
123
pkg/meterreporter/registry.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
)
|
||||
|
||||
// Exported names for every meter the reporter knows about. Refer to these
|
||||
// symbols (not string literals) everywhere - typos turn into compile errors
|
||||
// instead of silently producing a new meter row at Zeus.
|
||||
var (
|
||||
MeterLogCount = meterreportertypes.MustNewName("signoz.meter.log.count")
|
||||
MeterLogSize = meterreportertypes.MustNewName("signoz.meter.log.size")
|
||||
MeterMetricDatapointCount = meterreportertypes.MustNewName("signoz.meter.metric.datapoint.count")
|
||||
MeterMetricDatapointSize = meterreportertypes.MustNewName("signoz.meter.metric.datapoint.size")
|
||||
MeterSpanCount = meterreportertypes.MustNewName("signoz.meter.span.count")
|
||||
MeterSpanSize = meterreportertypes.MustNewName("signoz.meter.span.size")
|
||||
)
|
||||
|
||||
const AggregationSum = "sum"
|
||||
|
||||
func baseMeters() []*Meter {
|
||||
meters := []*Meter{
|
||||
{
|
||||
Name: MeterLogCount,
|
||||
Unit: "count",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectLogCountMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterLogSize,
|
||||
Unit: "bytes",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectLogSizeMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterMetricDatapointCount,
|
||||
Unit: "count",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectMetricDatapointCountMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterMetricDatapointSize,
|
||||
Unit: "bytes",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectMetricDatapointSizeMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterSpanCount,
|
||||
Unit: "count",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectSpanCountMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterSpanSize,
|
||||
Unit: "bytes",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectSpanSizeMeter,
|
||||
},
|
||||
}
|
||||
|
||||
mustValidateMeters(meters...)
|
||||
return meters
|
||||
}
|
||||
|
||||
// DefaultMeters returns the hardcoded query-backed meters supported by the reporter.
|
||||
func DefaultMeters() ([]Meter, error) {
|
||||
meters := baseMeters()
|
||||
if err := validateMeters(meters...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resolved := make([]Meter, 0, len(meters))
|
||||
for _, meter := range meters {
|
||||
resolved = append(resolved, *meter)
|
||||
}
|
||||
|
||||
return resolved, nil
|
||||
}
|
||||
|
||||
// validateMeters checks that the runtime meter list is internally consistent.
|
||||
// Every meter must:
|
||||
// - have a non-zero Name,
|
||||
// - have a non-empty Unit,
|
||||
// - have a non-empty Aggregation,
|
||||
// - have a non-nil Collect function,
|
||||
// - use a unique (Name, Aggregation) pair.
|
||||
func validateMeters(meters ...*Meter) error {
|
||||
seen := make(map[string]struct{}, len(meters))
|
||||
|
||||
for _, meter := range meters {
|
||||
if meter == nil {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "nil meter in registry")
|
||||
}
|
||||
if meter.Name.IsZero() {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter with empty name in registry")
|
||||
}
|
||||
if meter.Unit == "" {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no unit", meter.Name.String())
|
||||
}
|
||||
if meter.Aggregation == "" {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no aggregation", meter.Name.String())
|
||||
}
|
||||
if meter.Collect == nil {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no collector function", meter.Name.String())
|
||||
}
|
||||
|
||||
key := meter.Name.String() + "|" + meter.Aggregation
|
||||
if _, ok := seen[key]; ok {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "duplicate meter %q with aggregation %q", meter.Name.String(), meter.Aggregation)
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// mustValidateMeters panics when hardcoded meter declarations are invalid.
|
||||
func mustValidateMeters(meters ...*Meter) {
|
||||
if err := validateMeters(meters...); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
107
pkg/meterreporter/retention.go
Normal file
107
pkg/meterreporter/retention.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strconv"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type RetentionDomain string
|
||||
|
||||
const (
|
||||
RetentionDomainLogs RetentionDomain = "logs"
|
||||
RetentionDomainMetrics RetentionDomain = "metrics"
|
||||
RetentionDomainTraces RetentionDomain = "traces"
|
||||
)
|
||||
|
||||
// defaultRetentionDaysByDomain is the per-domain fallback used when no
|
||||
// ttl_setting row exists for the org. Values mirror the TTL set by the
|
||||
// canonical ClickHouse schema for each domain's main table:
|
||||
//
|
||||
// - logs: signoz_logs.logs_v2 → 15 days
|
||||
// - metrics: signoz_metrics.samples_v4 → 2 592 000 s = 30 days
|
||||
// - traces: signoz_traces.signoz_index_v3 → 1 296 000 s = 15 days
|
||||
//
|
||||
// If a migration ever changes the DDL default for a domain, update the
|
||||
// corresponding entry here so billing readings match reality.
|
||||
var defaultRetentionDaysByDomain = map[RetentionDomain]int{
|
||||
RetentionDomainLogs: types.DefaultRetentionDays,
|
||||
RetentionDomainMetrics: 30,
|
||||
RetentionDomainTraces: 15,
|
||||
}
|
||||
|
||||
// resolveRetentionDays returns the configured retention for orgID in the given
|
||||
// domain as a string suitable for the DimensionRetentionDays dimension.
|
||||
//
|
||||
// It queries the ttl_setting table using the local (non-distributed) table
|
||||
// name, which is what the V2 retention writer uses. The TTL column is stored
|
||||
// in days by the V2 path. When no row exists or the stored TTL is non-positive,
|
||||
// defaultRetentionDaysByDomain provides the per-domain ClickHouse default so
|
||||
// the reading always carries an accurate retention dimension.
|
||||
func resolveRetentionDays(ctx context.Context, sqlstore sqlstore.SQLStore, orgID valuer.UUID, domain RetentionDomain) (string, bool, error) {
|
||||
if sqlstore == nil {
|
||||
return "", false, nil
|
||||
}
|
||||
tableName, ok := retentionTableName(domain)
|
||||
if !ok {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
ttl := new(types.TTLSetting)
|
||||
err := sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(ttl).
|
||||
Where("table_name = ?", tableName).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
OrderExpr("created_at DESC").
|
||||
Limit(1).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return domainFallbackRetention(domain)
|
||||
}
|
||||
return "", false, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "load retention for domain %q", domain)
|
||||
}
|
||||
|
||||
if ttl.TTL <= 0 {
|
||||
return domainFallbackRetention(domain)
|
||||
}
|
||||
|
||||
// TTL is stored in days by the V2 retention path (SetCustomRetentionV2).
|
||||
return strconv.Itoa(ttl.TTL), true, nil
|
||||
}
|
||||
|
||||
// domainFallbackRetention returns the per-domain default retention used when
|
||||
// no ttl_setting row exists for an org.
|
||||
func domainFallbackRetention(domain RetentionDomain) (string, bool, error) {
|
||||
days, ok := defaultRetentionDaysByDomain[domain]
|
||||
if !ok {
|
||||
return "", false, errors.Newf(errors.TypeInternal, ErrCodeReportFailed, "no default retention defined for domain %q", domain)
|
||||
}
|
||||
return strconv.Itoa(days), true, nil
|
||||
}
|
||||
|
||||
// retentionTableName returns the local ClickHouse table name used as the key
|
||||
// in ttl_setting rows for each domain. Must match what SetCustomRetentionV2
|
||||
// writes (the local, not distributed, table name).
|
||||
func retentionTableName(domain RetentionDomain) (string, bool) {
|
||||
switch domain {
|
||||
case RetentionDomainLogs:
|
||||
return telemetrylogs.DBName + "." + telemetrylogs.LogsV2LocalTableName, true
|
||||
case RetentionDomainMetrics:
|
||||
return telemetrymetrics.DBName + "." + telemetrymetrics.SamplesV4LocalTableName, true
|
||||
case RetentionDomainTraces:
|
||||
return telemetrytraces.DBName + "." + telemetrytraces.SpanIndexV3LocalTableName, true
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
@@ -2008,7 +2008,7 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
// No V2 configuration found, return defaults
|
||||
response.DefaultTTLDays = 15
|
||||
response.DefaultTTLDays = types.DefaultRetentionDays
|
||||
response.TTLConditions = []model.CustomRetentionRule{}
|
||||
response.Status = constants.StatusSuccess
|
||||
response.ColdStorageTTLDays = -1
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/identn"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
|
||||
@@ -129,6 +130,9 @@ type Config struct {
|
||||
// Auditor config
|
||||
Auditor auditor.Config `mapstructure:"auditor"`
|
||||
|
||||
// MeterReporter config
|
||||
MeterReporter meterreporter.Config `mapstructure:"meterreporter"`
|
||||
|
||||
// CloudIntegration config
|
||||
CloudIntegration cloudintegration.Config `mapstructure:"cloudintegration"`
|
||||
}
|
||||
@@ -162,6 +166,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
|
||||
identn.NewConfigFactory(),
|
||||
serviceaccount.NewConfigFactory(),
|
||||
auditor.NewConfigFactory(),
|
||||
meterreporter.NewConfigFactory(),
|
||||
cloudintegration.NewConfigFactory(),
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/identn/apikeyidentn"
|
||||
"github.com/SigNoz/signoz/pkg/identn/impersonationidentn"
|
||||
"github.com/SigNoz/signoz/pkg/identn/tokenizeridentn"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter/noopmeterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
@@ -315,6 +317,12 @@ func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[audi
|
||||
)
|
||||
}
|
||||
|
||||
func NewMeterReporterProviderFactories() factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
noopmeterreporter.NewFactory(),
|
||||
)
|
||||
}
|
||||
|
||||
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
configflagger.NewFactory(registry),
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/identn"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -84,6 +85,7 @@ type SigNoz struct {
|
||||
Flagger flagger.Flagger
|
||||
Gateway gateway.Gateway
|
||||
Auditor auditor.Auditor
|
||||
MeterReporter meterreporter.Reporter
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -104,6 +106,7 @@ func New(
|
||||
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
|
||||
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
|
||||
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
|
||||
meterReporterProviderFactories func(licensing.Licensing, telemetrystore.TelemetryStore, sqlstore.SQLStore, organization.Getter, zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]],
|
||||
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
|
||||
cloudIntegrationCallback func(sqlstore.SQLStore, global.Global, zeus.Zeus, gateway.Gateway, licensing.Licensing, serviceaccount.Module, cloudintegration.Config) (cloudintegration.Module, error),
|
||||
rulerProviderFactories func(cache.Cache, alertmanager.Alertmanager, sqlstore.SQLStore, telemetrystore.TelemetryStore, telemetrytypes.MetadataStore, prometheus.Prometheus, organization.Getter, rulestatehistory.Module, querier.Querier, queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]],
|
||||
@@ -377,6 +380,12 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize meter reporter from the variant-specific provider factories
|
||||
meterReporter, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.MeterReporter, meterReporterProviderFactories(licensing, telemetrystore, sqlstore, orgGetter, zeus), config.MeterReporter.Provider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize authns
|
||||
store := sqlauthnstore.NewStore(sqlstore)
|
||||
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
|
||||
@@ -491,6 +500,7 @@ func New(
|
||||
factory.NewNamedService(factory.MustNewName("authz"), authz),
|
||||
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
|
||||
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
|
||||
factory.NewNamedService(factory.MustNewName("meterreporter"), meterReporter, factory.MustNewName("licensing")),
|
||||
factory.NewNamedService(factory.MustNewName("ruler"), rulerInstance),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -540,5 +550,6 @@ func New(
|
||||
Flagger: flagger,
|
||||
Gateway: gateway,
|
||||
Auditor: auditor,
|
||||
MeterReporter: meterReporter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
41
pkg/types/meterreportertypes/name.go
Normal file
41
pkg/types/meterreportertypes/name.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package meterreportertypes
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
var nameRegex = regexp.MustCompile(`^[a-z][a-z0-9_.]+$`)
|
||||
|
||||
// Name is a concrete type for a meter name. Dotted namespace identifiers like
|
||||
// "signoz.meter.log.count" are permitted; arbitrary strings are not, to avoid
|
||||
// typos silently producing distinct meter rows at Zeus.
|
||||
type Name struct {
|
||||
s string
|
||||
}
|
||||
|
||||
func NewName(s string) (Name, error) {
|
||||
if !nameRegex.MatchString(s) {
|
||||
return Name{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid meter name: %s", s)
|
||||
}
|
||||
|
||||
return Name{s: s}, nil
|
||||
}
|
||||
|
||||
func MustNewName(s string) Name {
|
||||
name, err := NewName(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return name
|
||||
}
|
||||
|
||||
func (n Name) String() string {
|
||||
return n.s
|
||||
}
|
||||
|
||||
func (n Name) IsZero() bool {
|
||||
return n.s == ""
|
||||
}
|
||||
32
pkg/types/meterreportertypes/types.go
Normal file
32
pkg/types/meterreportertypes/types.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package meterreportertypes
|
||||
|
||||
// Reading is a single meter value sent to Zeus. Zeus UPSERTs on
|
||||
// (license_key, dimension_hash, timestamp), so repeated readings within the
|
||||
// same tick window safely overwrite prior values.
|
||||
type Reading struct {
|
||||
// MeterName is the fully-qualified meter identifier.
|
||||
MeterName string `json:"meterName"`
|
||||
|
||||
// Value is the aggregated scalar for this (meter, aggregation) pair over the reporting window.
|
||||
Value float64 `json:"value"`
|
||||
|
||||
// Timestamp is the window-start in epoch milliseconds (UTC day start).
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
|
||||
// IsCompleted is true only for sealed past buckets. In-progress buckets
|
||||
// (e.g. the current UTC day) report IsCompleted=false so Zeus knows the value may still change.
|
||||
IsCompleted bool `json:"isCompleted"`
|
||||
|
||||
// Dimensions is the per-reading label set.
|
||||
Dimensions map[string]string `json:"dimensions"`
|
||||
}
|
||||
|
||||
// PostableMeterReadings is the request body for Zeus.PutMeterReadings.
|
||||
type PostableMeterReadings struct { // ! Needs fix once zeus contract is setup
|
||||
// IdempotencyKey is echoed as the X-Idempotency-Key header and stored by
|
||||
// Zeus so retries within the same tick window overwrite rather than duplicate.
|
||||
IdempotencyKey string `json:"idempotencyKey"`
|
||||
|
||||
// Readings is the batch of meter values being shipped.
|
||||
Readings []Reading `json:"readings"`
|
||||
}
|
||||
@@ -73,6 +73,11 @@ func NewTraitsFromOrganization(org *Organization) map[string]any {
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultRetentionDays is the retention shown in the UI when no explicit
|
||||
// policy has been configured for an org. Both the API reader and the meter
|
||||
// reporter use this value so a change here propagates everywhere.
|
||||
const DefaultRetentionDays = 15
|
||||
|
||||
type TTLSetting struct {
|
||||
bun.BaseModel `bun:"table:ttl_setting"`
|
||||
Identifiable
|
||||
|
||||
@@ -49,6 +49,10 @@ func (provider *provider) PutMetersV2(_ context.Context, _ string, _ []byte) err
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v2 is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) PutMeterReadings(_ context.Context, _ string, _ string, _ []byte) error {
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meter readings is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) PutProfile(_ context.Context, _ string, _ *zeustypes.PostableProfile) error {
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
|
||||
}
|
||||
|
||||
@@ -35,6 +35,11 @@ type Zeus interface {
|
||||
// Puts the meters for the given license key using Zeus.
|
||||
PutMetersV2(context.Context, string, []byte) error
|
||||
|
||||
// PutMeterReadings ships TDD-shape meter readings to the v2/meters
|
||||
// endpoint. idempotencyKey is propagated as X-Idempotency-Key so Zeus can
|
||||
// UPSERT on retries.
|
||||
PutMeterReadings(ctx context.Context, licenseKey string, idempotencyKey string, body []byte) error
|
||||
|
||||
// Put profile for the given license key.
|
||||
PutProfile(context.Context, string, *zeustypes.PostableProfile) error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user