mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-23 04:10:29 +01:00
Compare commits
6 Commits
refactor/r
...
feat/billi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fad83e6918 | ||
|
|
d378951f8d | ||
|
|
5dec8e26ad | ||
|
|
e102cc07ce | ||
|
|
b898fb6e3b | ||
|
|
bc7dbab2b2 |
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
@@ -109,6 +110,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
|
||||
return signoz.NewAuditorProviderFactories()
|
||||
},
|
||||
func(_ licensing.Licensing, _ telemetrystore.TelemetryStore, _ sqlstore.SQLStore, _ organization.Getter, _ zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
return signoz.NewMeterReporterProviderFactories()
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
return querier.NewHandler(ps, q, a)
|
||||
},
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/SigNoz/signoz/ee/gateway/httpgateway"
|
||||
enterpriselicensing "github.com/SigNoz/signoz/ee/licensing"
|
||||
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
|
||||
"github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter"
|
||||
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration/implcloudprovider"
|
||||
"github.com/SigNoz/signoz/ee/modules/dashboard/impldashboard"
|
||||
@@ -38,6 +39,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/gateway"
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
pkgcloudintegration "github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
@@ -157,6 +159,13 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(licensing licensing.Licensing, telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore, orgGetter organization.Getter, zeus zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
factories := signoz.NewMeterReporterProviderFactories()
|
||||
if err := factories.Add(signozmeterreporter.NewFactory(licensing, telemetryStore, sqlStore, orgGetter, zeus)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
communityHandler := querier.NewHandler(ps, q, a)
|
||||
return eequerier.NewHandler(ps, q, communityHandler)
|
||||
|
||||
@@ -407,3 +407,23 @@ cloudintegration:
|
||||
agent:
|
||||
# The version of the cloud integration agent.
|
||||
version: v0.0.8
|
||||
|
||||
##################### Meter Reporter #####################
|
||||
meterreporter:
|
||||
# Specifies the meter reporter provider to use.
|
||||
# noop: does not report any meters (community default).
|
||||
# signoz: periodically queries meters via the querier and ships readings to Zeus (enterprise).
|
||||
provider: noop
|
||||
# The interval between collection ticks. Minimum 30m.
|
||||
interval: 6h
|
||||
# The per-tick timeout that bounds collect-and-ship work.
|
||||
timeout: 30s
|
||||
retry:
|
||||
# Whether to retry on transient failures.
|
||||
enabled: true
|
||||
# The initial wait time before the first retry.
|
||||
initial_interval: 5s
|
||||
# The upper bound on backoff interval.
|
||||
max_interval: 30s
|
||||
# The total maximum time spent retrying.
|
||||
max_elapsed_time: 1m
|
||||
|
||||
231
ee/meterreporter/signozmeterreporter/collect.go
Normal file
231
ee/meterreporter/signozmeterreporter/collect.go
Normal file
@@ -0,0 +1,231 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
const (
|
||||
phaseSealed = "sealed"
|
||||
phaseToday = "today"
|
||||
|
||||
attrPhase = "phase"
|
||||
attrResult = "result"
|
||||
|
||||
resultSuccess = "success"
|
||||
resultFailure = "failure"
|
||||
)
|
||||
|
||||
// tick runs one collect-and-ship cycle for the instance's active org. The
|
||||
// tick first checkpoints against Zeus via LatestSealed; if that fails the
|
||||
// whole tick is skipped so the two concerns can't run against an inconsistent
|
||||
// view of Zeus state. The underlying http client already retries transient
|
||||
// failures 3× with a 2s constant backoff (see pkg/http/client), so no extra
|
||||
// retry loop is needed here. On success the tick runs two concerns:
|
||||
//
|
||||
// (A) sealed-range processor — forward-fills is_completed=true days from
|
||||
// the Zeus-reported catchup start up to yesterday, capped at
|
||||
// Config.CatchupMaxDaysPerTick. On any per-day ship failure the loop
|
||||
// breaks; next tick's LatestSealed returns the same catchup start so
|
||||
// the failed day is retried cleanly with no local state to reconcile.
|
||||
//
|
||||
// (B) today partial — re-emits the intra-day [00:00 UTC, now) window every
|
||||
// tick as is_completed=false; the day-scoped X-Idempotency-Key makes
|
||||
// successive writes UPSERT at Zeus.
|
||||
//
|
||||
// Per-collector and ship failures are logged and counted — they do not abort
|
||||
// the tick or propagate, because the reporter must keep ticking on the next interval.
|
||||
func (provider *Provider) tick(ctx context.Context) error {
|
||||
now := time.Now().UTC()
|
||||
// Align to 00:00 UTC of the current day. All window boundaries are derived
|
||||
// from this single snapshot so a tick can't straddle midnight inconsistently.
|
||||
todayStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
||||
yesterday := todayStart.AddDate(0, 0, -1)
|
||||
|
||||
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "failed to list organizations")
|
||||
}
|
||||
if len(orgs) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(orgs) > 1 {
|
||||
// Billing is scoped to a single license per instance, and the meter data
|
||||
// in signoz_meter carries no org marker — we can't attribute samples to
|
||||
// one org versus another. Report against the first org and warn so the
|
||||
// mis-configuration is visible in logs.
|
||||
provider.settings.Logger().WarnContext(ctx, "multiple orgs on a single instance; reporting only the first", slog.Int("org_count", len(orgs)))
|
||||
}
|
||||
org := orgs[0]
|
||||
|
||||
license, err := provider.licensing.GetActive(ctx, org.ID)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "failed to fetch active license for org %q", org.ID.StringValue())
|
||||
}
|
||||
if license == nil || license.Key == "" {
|
||||
provider.settings.Logger().WarnContext(ctx, "skipping tick, nil/empty license for org", slog.String("org_id", org.ID.StringValue()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checkpoint against Zeus. A nil return means "no sealed rows yet for this
|
||||
// license" → bootstrap catch-up starts from today - HistoricalBackfillDays.
|
||||
// Any error aborts the whole tick so concern B doesn't flow without a
|
||||
// consistent view of the sealed catchup start; the http client already
|
||||
// retried transient failures before reaching this point.
|
||||
latest, err := provider.zeus.LatestSealed(ctx, license.Key)
|
||||
if err != nil {
|
||||
provider.metrics.latestSealedErrors.Add(ctx, 1)
|
||||
provider.settings.Logger().ErrorContext(ctx, "skipping tick: latest-sealed call failed", errors.Attr(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Concern A — sealed-range processor.
|
||||
var catchupStart time.Time
|
||||
if latest == nil {
|
||||
catchupStart = todayStart.AddDate(0, 0, -meterreporter.HistoricalBackfillDays)
|
||||
} else {
|
||||
catchupStart = latest.AddDate(0, 0, 1)
|
||||
}
|
||||
if !catchupStart.After(yesterday) {
|
||||
end := catchupStart.AddDate(0, 0, provider.config.CatchupMaxDaysPerTick-1)
|
||||
if end.After(yesterday) {
|
||||
end = yesterday
|
||||
}
|
||||
for day := catchupStart; !day.After(end); day = day.AddDate(0, 0, 1) {
|
||||
window := meterreporter.Window{
|
||||
StartUnixMilli: day.UnixMilli(),
|
||||
EndUnixMilli: day.AddDate(0, 0, 1).UnixMilli(),
|
||||
IsCompleted: true,
|
||||
}
|
||||
date := day.Format("2006-01-02")
|
||||
err := provider.runPhase(ctx, org.ID, license.Key, window, date, phaseSealed)
|
||||
result := resultSuccess
|
||||
if err != nil {
|
||||
result = resultFailure
|
||||
}
|
||||
provider.metrics.catchupDaysProcessed.Add(ctx, 1, metric.WithAttributes(attribute.String(attrResult, result)))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Concern B — today partial. Runs every tick regardless of concern A's
|
||||
// progress; concern A's failures break their own loop but never block the
|
||||
// partial.
|
||||
todayWindow := meterreporter.Window{
|
||||
StartUnixMilli: todayStart.UnixMilli(),
|
||||
EndUnixMilli: now.UnixMilli(),
|
||||
IsCompleted: false,
|
||||
}
|
||||
_ = provider.runPhase(ctx, org.ID, license.Key, todayWindow, todayStart.Format("2006-01-02"), phaseToday)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// runPhase is the shared collect+ship body for one (window, idempotency-date)
|
||||
// pair. phaseLabel is informational — logs only; the wire format carries
|
||||
// IsCompleted via window. Returns err only on ship failure so the sealed-range
|
||||
// loop can break on first failure; collect-level failures are logged and
|
||||
// counted per-meter inside collectOrgReadings and never bubble up here.
|
||||
func (provider *Provider) runPhase(ctx context.Context, orgID valuer.UUID, licenseKey string, window meterreporter.Window, date string, phaseLabel string) error {
|
||||
phaseAttr := metric.WithAttributes(attribute.String(attrPhase, phaseLabel))
|
||||
|
||||
collectStart := time.Now()
|
||||
readings := provider.collectOrgReadings(ctx, orgID, window, phaseLabel)
|
||||
provider.metrics.collectDuration.Record(ctx, time.Since(collectStart).Seconds(), phaseAttr)
|
||||
if len(readings) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
shipStart := time.Now()
|
||||
err := provider.shipReadings(ctx, licenseKey, date, readings)
|
||||
provider.metrics.shipDuration.Record(ctx, time.Since(shipStart).Seconds(), phaseAttr)
|
||||
if err != nil {
|
||||
provider.metrics.postErrors.Add(ctx, 1, phaseAttr)
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to ship meter readings",
|
||||
errors.Attr(err),
|
||||
slog.String("phase", phaseLabel),
|
||||
slog.String("date", date),
|
||||
slog.Int("readings", len(readings)),
|
||||
)
|
||||
return err
|
||||
}
|
||||
provider.metrics.readingsEmitted.Add(ctx, int64(len(readings)), phaseAttr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// collectOrgReadings runs every registered Meter's Collector against orgID and
|
||||
// returns the combined Readings. One bad meter must not block the batch, so
|
||||
// per-meter failures are logged and counted via collectErrors and then skipped
|
||||
// — the remaining meters still ship. phaseLabel tags the error counter so
|
||||
// sealed-day collect failures can be separated from today-partial failures.
|
||||
func (provider *Provider) collectOrgReadings(ctx context.Context, orgID valuer.UUID, window meterreporter.Window, phaseLabel string) []meterreportertypes.Reading {
|
||||
readings := make([]meterreportertypes.Reading, 0, len(provider.meters))
|
||||
phaseAttr := metric.WithAttributes(attribute.String(attrPhase, phaseLabel))
|
||||
|
||||
for _, meter := range provider.meters {
|
||||
collectedReadings, err := meter.Collect(ctx, provider.deps, meter, orgID, window)
|
||||
if err != nil {
|
||||
provider.metrics.collectErrors.Add(ctx, 1, phaseAttr)
|
||||
provider.settings.Logger().WarnContext(ctx, "meter collection failed",
|
||||
errors.Attr(err),
|
||||
slog.String("meter", meter.Name.String()),
|
||||
slog.String("org_id", orgID.StringValue()),
|
||||
slog.String("phase", phaseLabel),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
readings = append(readings, collectedReadings...)
|
||||
}
|
||||
|
||||
return readings
|
||||
}
|
||||
|
||||
// shipReadings serializes the batch as PostableMeterReadings and, in the fully
|
||||
// wired flow, POSTs it to Zeus under a date-scoped idempotency key so repeat
|
||||
// ticks within the same UTC day UPSERT instead of duplicating usage.
|
||||
//
|
||||
// ! TEMPORARY: the Zeus PutMeterReadings endpoint isn't live yet. Until it
|
||||
// ships we log the serialized payload at INFO instead, which lets staging
|
||||
// verify collection end-to-end without a server counterpart. Once the API
|
||||
// lands, drop the log block and restore:
|
||||
//
|
||||
// if err := provider.zeus.PutMeterReadings(ctx, licenseKey, idempotencyKey, body); err != nil { ... }
|
||||
func (provider *Provider) shipReadings(ctx context.Context, licenseKey string, date string, readings []meterreportertypes.Reading) error {
|
||||
idempotencyKey := fmt.Sprintf("meter-cron:%s", date)
|
||||
|
||||
// ! TODO: confirm this payload shape once the Zeus API is finalized.
|
||||
payload := meterreportertypes.PostableMeterReadings{
|
||||
IdempotencyKey: idempotencyKey,
|
||||
Readings: readings,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, meterreporter.ErrCodeReportFailed, "marshal meter readings")
|
||||
}
|
||||
|
||||
provider.settings.Logger().InfoContext(ctx, "meter readings (Zeus API not yet live — dry-run log)",
|
||||
slog.String("license_key", licenseKey),
|
||||
slog.String("idempotency_key", idempotencyKey),
|
||||
slog.Int("readings", len(readings)),
|
||||
slog.String("payload", string(body)),
|
||||
)
|
||||
// Keep the zeus dep referenced so the factory signature and DI wiring don't
|
||||
// bitrot while the POST is stubbed out.
|
||||
_ = provider.zeus
|
||||
|
||||
return nil
|
||||
}
|
||||
159
ee/meterreporter/signozmeterreporter/provider.go
Normal file
159
ee/meterreporter/signozmeterreporter/provider.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
)
|
||||
|
||||
var _ factory.ServiceWithHealthy = (*Provider)(nil)
|
||||
|
||||
// Provider is the enterprise meter reporter. It ticks on a fixed interval,
|
||||
// invokes every registered Collector against the instance's licensed org, and
|
||||
// ships the resulting readings to Zeus. Community builds wire a noop provider
|
||||
// instead, so this type never runs there.
|
||||
type Provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
config meterreporter.Config
|
||||
meters []meterreporter.Meter
|
||||
deps meterreporter.CollectorDeps
|
||||
|
||||
licensing licensing.Licensing
|
||||
orgGetter organization.Getter
|
||||
zeus zeus.Zeus
|
||||
|
||||
healthyC chan struct{}
|
||||
stopC chan struct{}
|
||||
goroutinesWg sync.WaitGroup
|
||||
metrics *reporterMetrics
|
||||
}
|
||||
|
||||
// NewFactory wires the signoz meter reporter into the provider registry. The
|
||||
// returned factory is registered alongside the noop factory so the "provider"
|
||||
// config field picks the right implementation at startup.
|
||||
func NewFactory(
|
||||
licensing licensing.Licensing,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
zeus zeus.Zeus,
|
||||
) factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName("signoz"),
|
||||
func(ctx context.Context, providerSettings factory.ProviderSettings, config meterreporter.Config) (meterreporter.Reporter, error) {
|
||||
return newProvider(ctx, providerSettings, config, licensing, telemetryStore, sqlstore, orgGetter, zeus)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func newProvider(
|
||||
_ context.Context,
|
||||
providerSettings factory.ProviderSettings,
|
||||
config meterreporter.Config,
|
||||
licensing licensing.Licensing,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
zeus zeus.Zeus,
|
||||
) (*Provider, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/meterreporter/signozmeterreporter")
|
||||
|
||||
metrics, err := newReporterMetrics(settings.Meter())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meters, err := meterreporter.DefaultMeters()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Provider{
|
||||
settings: settings,
|
||||
config: config,
|
||||
meters: meters,
|
||||
deps: meterreporter.CollectorDeps{
|
||||
TelemetryStore: telemetryStore,
|
||||
SQLStore: sqlstore,
|
||||
},
|
||||
licensing: licensing,
|
||||
orgGetter: orgGetter,
|
||||
zeus: zeus,
|
||||
healthyC: make(chan struct{}),
|
||||
stopC: make(chan struct{}),
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start runs an initial tick, then loops on Config.Interval until Stop is
|
||||
// called. It blocks until the loop goroutine returns — that shape matches the
|
||||
// factory.Service contract the rest of the codebase uses, so the supervisor
|
||||
// can join on it the same way as other long-running services.
|
||||
func (provider *Provider) Start(ctx context.Context) error {
|
||||
close(provider.healthyC)
|
||||
|
||||
provider.goroutinesWg.Add(1)
|
||||
go func() {
|
||||
defer provider.goroutinesWg.Done()
|
||||
|
||||
provider.runTick(ctx)
|
||||
|
||||
ticker := time.NewTicker(provider.config.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-provider.stopC:
|
||||
return
|
||||
case <-ticker.C:
|
||||
provider.runTick(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
provider.goroutinesWg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop signals the tick loop and waits for any in-flight tick to finish.
|
||||
// Drain time is bounded by Config.Timeout because every tick runs under that
|
||||
// deadline, so shutdown can't stall on a hung ClickHouse or Zeus call.
|
||||
func (provider *Provider) Stop(_ context.Context) error {
|
||||
<-provider.healthyC
|
||||
select {
|
||||
case <-provider.stopC:
|
||||
// already closed
|
||||
default:
|
||||
close(provider.stopC)
|
||||
}
|
||||
provider.goroutinesWg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) Healthy() <-chan struct{} {
|
||||
return provider.healthyC
|
||||
}
|
||||
|
||||
// runTick executes one collect-and-ship cycle under Config.Timeout. Errors
|
||||
// from tick are logged and counted only — they never propagate, because the
|
||||
// reporter must keep firing on subsequent intervals even if one batch fails.
|
||||
func (provider *Provider) runTick(parentCtx context.Context) {
|
||||
provider.metrics.ticks.Add(parentCtx, 1)
|
||||
|
||||
ctx, cancel := context.WithTimeout(parentCtx, provider.config.Timeout)
|
||||
defer cancel()
|
||||
|
||||
if err := provider.tick(ctx); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "meter reporter tick failed", errors.Attr(err), slog.Duration("timeout", provider.config.Timeout))
|
||||
}
|
||||
}
|
||||
76
ee/meterreporter/signozmeterreporter/telemetry.go
Normal file
76
ee/meterreporter/signozmeterreporter/telemetry.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package signozmeterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
type reporterMetrics struct {
|
||||
ticks metric.Int64Counter
|
||||
readingsEmitted metric.Int64Counter
|
||||
collectErrors metric.Int64Counter
|
||||
postErrors metric.Int64Counter
|
||||
latestSealedErrors metric.Int64Counter
|
||||
catchupDaysProcessed metric.Int64Counter
|
||||
collectDuration metric.Float64Histogram
|
||||
shipDuration metric.Float64Histogram
|
||||
}
|
||||
|
||||
func newReporterMetrics(meter metric.Meter) (*reporterMetrics, error) {
|
||||
var errs error
|
||||
|
||||
ticks, err := meter.Int64Counter("signoz.meterreporter.ticks", metric.WithDescription("Total number of meter reporter ticks that ran to completion or aborted."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
readingsEmitted, err := meter.Int64Counter("signoz.meterreporter.readings.emitted", metric.WithDescription("Total number of meter readings shipped to Zeus."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
collectErrors, err := meter.Int64Counter("signoz.meterreporter.collect.errors", metric.WithDescription("Total number of collect errors across organizations and meters, tagged with phase={sealed|today}."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
postErrors, err := meter.Int64Counter("signoz.meterreporter.post.errors", metric.WithDescription("Total number of Zeus POST failures, tagged with phase={sealed|today}."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
latestSealedErrors, err := meter.Int64Counter("signoz.meterreporter.latestsealed.errors", metric.WithDescription("Total number of ticks skipped because the Zeus LatestSealed call failed."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
catchupDaysProcessed, err := meter.Int64Counter("signoz.meterreporter.catchup.days_processed", metric.WithDescription("Total number of sealed (is_completed=true) days the catch-up loop attempted to ship, tagged with result={success|failure}."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
collectDuration, err := meter.Float64Histogram("signoz.meterreporter.collect.duration", metric.WithDescription("Time taken to collect readings from all registered meters in a single phase of a tick, tagged with phase={sealed|today}."), metric.WithUnit("s"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
shipDuration, err := meter.Float64Histogram("signoz.meterreporter.ship.duration", metric.WithDescription("Time taken to ship (marshal + POST) collected readings to Zeus in a single phase of a tick, tagged with phase={sealed|today}."), metric.WithUnit("s"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
return &reporterMetrics{
|
||||
ticks: ticks,
|
||||
readingsEmitted: readingsEmitted,
|
||||
collectErrors: collectErrors,
|
||||
postErrors: postErrors,
|
||||
latestSealedErrors: latestSealedErrors,
|
||||
catchupDaysProcessed: catchupDaysProcessed,
|
||||
collectDuration: collectDuration,
|
||||
shipDuration: shipDuration,
|
||||
}, nil
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
@@ -148,6 +149,52 @@ func (provider *Provider) PutMetersV2(ctx context.Context, key string, data []by
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) PutMeterReadings(ctx context.Context, key string, idempotencyKey string, data []byte) error {
|
||||
headers := http.Header{}
|
||||
if idempotencyKey != "" {
|
||||
headers.Set("X-Idempotency-Key", idempotencyKey)
|
||||
}
|
||||
|
||||
_, err := provider.doWithHeaders(
|
||||
ctx,
|
||||
provider.config.URL.JoinPath("/v2/meters"),
|
||||
http.MethodPost,
|
||||
key,
|
||||
data,
|
||||
headers,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// ! TODO: depends on Zeus shipping GET /v2/meters/latest-sealed. Until it
|
||||
// lands, this call will 404; the caller (meterreporter) treats the error as
|
||||
// "skip concern A this tick" so the today-partial path keeps flowing.
|
||||
func (provider *Provider) LatestSealed(ctx context.Context, key string) (*time.Time, error) {
|
||||
response, err := provider.do(
|
||||
ctx,
|
||||
provider.config.URL.JoinPath("/v2/meters/latest-sealed"),
|
||||
http.MethodGet,
|
||||
key,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dayValue := gjson.GetBytes(response, "data.day")
|
||||
if !dayValue.Exists() || dayValue.Type == gjson.Null {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
day, err := time.Parse("2006-01-02", dayValue.String())
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, zeus.ErrCodeResponseMalformed, "parse latest sealed day %q", dayValue.String())
|
||||
}
|
||||
|
||||
return &day, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) PutProfile(ctx context.Context, key string, profile *zeustypes.PostableProfile) error {
|
||||
body, err := json.Marshal(profile)
|
||||
if err != nil {
|
||||
@@ -183,12 +230,21 @@ func (provider *Provider) PutHost(ctx context.Context, key string, host *zeustyp
|
||||
}
|
||||
|
||||
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
|
||||
return provider.doWithHeaders(ctx, url, method, key, requestBody, nil)
|
||||
}
|
||||
|
||||
func (provider *Provider) doWithHeaders(ctx context.Context, url *url.URL, method string, key string, requestBody []byte, extraHeaders http.Header) ([]byte, error) {
|
||||
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
|
||||
request.Header.Set("Content-Type", "application/json")
|
||||
for k, vs := range extraHeaders {
|
||||
for _, v := range vs {
|
||||
request.Header.Add(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
response, err := provider.httpClient.Do(request)
|
||||
if err != nil {
|
||||
|
||||
34
pkg/meterreporter/collector.go
Normal file
34
pkg/meterreporter/collector.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Window is the time range a collector produces readings for. StartUnixMilli
|
||||
// and EndUnixMilli define the [start, end) filter on the meter table and are
|
||||
// emitted verbatim on every Reading. IsCompleted is caller-declared: true for
|
||||
// a sealed past day (is_completed=true at Zeus), false for the intra-day open
|
||||
// window that the cron re-emits every tick.
|
||||
type Window struct {
|
||||
StartUnixMilli int64
|
||||
EndUnixMilli int64
|
||||
IsCompleted bool
|
||||
}
|
||||
|
||||
// CollectorDeps is the shared dependency bag handed to every collector. Each
|
||||
// collector reaches for the subset it needs (logs/metrics/traces meters all
|
||||
// read from ClickHouse; retention dimensions come from sqlstore).
|
||||
type CollectorDeps struct {
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
SQLStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
// CollectorFunc turns one registered Meter into zero or more Readings for the
|
||||
// given org and window. Returning an error signals tick-level failure for this
|
||||
// meter only — the caller keeps iterating the rest.
|
||||
type CollectorFunc func(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error)
|
||||
110
pkg/meterreporter/collector_log.go
Normal file
110
pkg/meterreporter/collector_log.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
// Collectors in this file are intentionally duplicated per meter. Do not fold
|
||||
// them into a shared helper — these are billing-critical paths, and keeping
|
||||
// each query isolated means a bug in one cannot silently corrupt every
|
||||
// customer's bill across every meter. Unit and Aggregation flow in from the
|
||||
// registry; the meter Name is hardcoded in each function so the registry stays
|
||||
// the single place a Name constant is bound to its query.
|
||||
|
||||
// CollectLogCountMeter sums every signoz.meter.log.count sample in the window
|
||||
// and emits one Reading for the org.
|
||||
func CollectLogCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterLogCount.String()),
|
||||
sb.GTE("unix_milli", window.StartUnixMilli),
|
||||
sb.LT("unix_milli", window.EndUnixMilli),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterLogCount.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainLogs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterLogCount.String(),
|
||||
Value: value,
|
||||
StartUnixMilli: window.StartUnixMilli,
|
||||
EndUnixMilli: window.EndUnixMilli,
|
||||
IsCompleted: window.IsCompleted,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// CollectLogSizeMeter sums every signoz.meter.log.size sample in the window
|
||||
// and emits one Reading for the org.
|
||||
func CollectLogSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterLogSize.String()),
|
||||
sb.GTE("unix_milli", window.StartUnixMilli),
|
||||
sb.LT("unix_milli", window.EndUnixMilli),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterLogSize.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainLogs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterLogSize.String(),
|
||||
Value: value,
|
||||
StartUnixMilli: window.StartUnixMilli,
|
||||
EndUnixMilli: window.EndUnixMilli,
|
||||
IsCompleted: window.IsCompleted,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
108
pkg/meterreporter/collector_metric.go
Normal file
108
pkg/meterreporter/collector_metric.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
// Collectors in this file are intentionally duplicated per meter. Do not fold
|
||||
// them into a shared helper — these are billing-critical paths, and keeping
|
||||
// each query isolated means a bug in one cannot silently corrupt every
|
||||
// customer's bill across every meter.
|
||||
|
||||
// CollectMetricDatapointCountMeter sums every signoz.meter.metric.datapoint.count
|
||||
// sample in the window and emits one Reading for the org.
|
||||
func CollectMetricDatapointCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterMetricDatapointCount.String()),
|
||||
sb.GTE("unix_milli", window.StartUnixMilli),
|
||||
sb.LT("unix_milli", window.EndUnixMilli),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterMetricDatapointCount.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterMetricDatapointCount.String(),
|
||||
Value: value,
|
||||
StartUnixMilli: window.StartUnixMilli,
|
||||
EndUnixMilli: window.EndUnixMilli,
|
||||
IsCompleted: window.IsCompleted,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// CollectMetricDatapointSizeMeter sums every signoz.meter.metric.datapoint.size
|
||||
// sample in the window and emits one Reading for the org.
|
||||
func CollectMetricDatapointSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterMetricDatapointSize.String()),
|
||||
sb.GTE("unix_milli", window.StartUnixMilli),
|
||||
sb.LT("unix_milli", window.EndUnixMilli),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterMetricDatapointSize.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterMetricDatapointSize.String(),
|
||||
Value: value,
|
||||
StartUnixMilli: window.StartUnixMilli,
|
||||
EndUnixMilli: window.EndUnixMilli,
|
||||
IsCompleted: window.IsCompleted,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
108
pkg/meterreporter/collector_trace.go
Normal file
108
pkg/meterreporter/collector_trace.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
// Collectors in this file are intentionally duplicated per meter. Do not fold
|
||||
// them into a shared helper — these are billing-critical paths, and keeping
|
||||
// each query isolated means a bug in one cannot silently corrupt every
|
||||
// customer's bill across every meter.
|
||||
|
||||
// CollectSpanCountMeter sums every signoz.meter.span.count sample in the window
|
||||
// and emits one Reading for the org.
|
||||
func CollectSpanCountMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterSpanCount.String()),
|
||||
sb.GTE("unix_milli", window.StartUnixMilli),
|
||||
sb.LT("unix_milli", window.EndUnixMilli),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterSpanCount.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainTraces)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterSpanCount.String(),
|
||||
Value: value,
|
||||
StartUnixMilli: window.StartUnixMilli,
|
||||
EndUnixMilli: window.EndUnixMilli,
|
||||
IsCompleted: window.IsCompleted,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
// CollectSpanSizeMeter sums every signoz.meter.span.size sample in the window
|
||||
// and emits one Reading for the org.
|
||||
func CollectSpanSizeMeter(ctx context.Context, deps CollectorDeps, meter Meter, orgID valuer.UUID, window Window) ([]meterreportertypes.Reading, error) {
|
||||
if deps.TelemetryStore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, ErrCodeReportFailed, "telemetry store is nil")
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(sum(value), 0) AS value")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", MeterSpanSize.String()),
|
||||
sb.GTE("unix_milli", window.StartUnixMilli),
|
||||
sb.LT("unix_milli", window.EndUnixMilli),
|
||||
)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var value float64
|
||||
if err := deps.TelemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&value); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "query meter %q", MeterSpanSize.String())
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
DimensionAggregation: meter.Aggregation,
|
||||
DimensionUnit: meter.Unit,
|
||||
DimensionOrganizationID: orgID.StringValue(),
|
||||
}
|
||||
|
||||
retentionDays, ok, err := resolveRetentionDays(ctx, deps.SQLStore, orgID, RetentionDomainTraces)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
dimensions[DimensionRetentionDays] = retentionDays
|
||||
}
|
||||
|
||||
return []meterreportertypes.Reading{{
|
||||
MeterName: MeterSpanSize.String(),
|
||||
Value: value,
|
||||
StartUnixMilli: window.StartUnixMilli,
|
||||
EndUnixMilli: window.EndUnixMilli,
|
||||
IsCompleted: window.IsCompleted,
|
||||
Dimensions: dimensions,
|
||||
}}, nil
|
||||
}
|
||||
90
pkg/meterreporter/config.go
Normal file
90
pkg/meterreporter/config.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
var _ factory.Config = (*Config)(nil)
|
||||
|
||||
// HistoricalBackfillDays is the static floor used on first deploy (or for a
|
||||
// license with no sealed rows yet at Zeus): the orchestrator begins catch-up
|
||||
// from today - HistoricalBackfillDays. It mirrors the ClickHouse meter-table
|
||||
// TTL of 12 months — anything older has no backing data anyway, so it is not
|
||||
// exposed as a config field.
|
||||
const HistoricalBackfillDays = 365
|
||||
|
||||
type Config struct {
|
||||
// Provider picks the reporter implementation. "noop" is the default and is
|
||||
// what community builds ship; "signoz" is the enterprise cron-based reporter.
|
||||
Provider string `mapstructure:"provider"`
|
||||
|
||||
// Interval is how often the reporter ticks (collect + ship). The validator
|
||||
// enforces a 5m floor — any sooner and we'd hammer ClickHouse for nothing,
|
||||
// since Zeus UPSERTs inside a UTC day anyway.
|
||||
Interval time.Duration `mapstructure:"interval"`
|
||||
|
||||
// Timeout bounds a single tick (collect + marshal + POST). Must be strictly
|
||||
// less than Interval so a slow tick can't overlap the next one. Catch-up
|
||||
// ticks can issue up to CatchupMaxDaysPerTick day-scoped POSTs back-to-back,
|
||||
// so the default is sized to cover that.
|
||||
Timeout time.Duration `mapstructure:"timeout"`
|
||||
|
||||
// CatchupMaxDaysPerTick caps how many sealed (is_completed=true) days the
|
||||
// orchestrator processes per tick, bounding Zeus POST blast radius. At the
|
||||
// default 30/tick and a 6h Interval, a full 12-month bootstrap catch-up
|
||||
// converges in roughly 3 days.
|
||||
CatchupMaxDaysPerTick int `mapstructure:"catchup_max_days_per_tick"`
|
||||
|
||||
// Retry configures exponential backoff around the Zeus POST. Tick-level
|
||||
// failures don't propagate — see runTick in the enterprise provider.
|
||||
Retry RetryConfig `mapstructure:"retry"`
|
||||
}
|
||||
|
||||
type RetryConfig struct {
|
||||
Enabled bool `mapstructure:"enabled"`
|
||||
InitialInterval time.Duration `mapstructure:"initial_interval"`
|
||||
MaxInterval time.Duration `mapstructure:"max_interval"`
|
||||
MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"`
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Provider: "noop",
|
||||
Interval: 6 * time.Hour,
|
||||
Timeout: 5 * time.Minute,
|
||||
CatchupMaxDaysPerTick: 30,
|
||||
Retry: RetryConfig{
|
||||
Enabled: true,
|
||||
InitialInterval: 5 * time.Second,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: time.Minute,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("meterreporter"), newConfig)
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
if c.Interval < 5*time.Minute {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be at least 5m")
|
||||
}
|
||||
|
||||
if c.Timeout < 3*time.Minute {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be at least 3m")
|
||||
}
|
||||
|
||||
if c.Timeout >= c.Interval {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be less than meterreporter::interval")
|
||||
}
|
||||
|
||||
if c.CatchupMaxDaysPerTick > 60 {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::catchup_max_days_per_tick must be at most 60")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
26
pkg/meterreporter/meter.go
Normal file
26
pkg/meterreporter/meter.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
)
|
||||
|
||||
// Meter is a single registered billing meter — a name, its billing metadata,
|
||||
// and the function that produces Readings for it.
|
||||
//
|
||||
// The same Name may appear multiple times in the registry provided each entry
|
||||
// uses a different Aggregation (e.g. a sum and a p99 of the same source meter).
|
||||
// The (Name, Aggregation) pair is what Zeus keys on.
|
||||
type Meter struct {
|
||||
// Name is the billing identifier emitted on every Reading.
|
||||
Name meterreportertypes.Name
|
||||
|
||||
// Unit is copied onto DimensionUnit by the collector.
|
||||
Unit string
|
||||
|
||||
// Aggregation is copied onto DimensionAggregation and participates in the
|
||||
// uniqueness check in validateMeters.
|
||||
Aggregation string
|
||||
|
||||
// Collect turns this Meter into zero or more Readings per tick.
|
||||
Collect CollectorFunc
|
||||
}
|
||||
26
pkg/meterreporter/meterreporter.go
Normal file
26
pkg/meterreporter/meterreporter.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeInvalidInput = errors.MustNewCode("meterreporter_invalid_input")
|
||||
ErrCodeReportFailed = errors.MustNewCode("meterreporter_report_failed")
|
||||
)
|
||||
|
||||
// Dimension keys automatically attached to every Reading.
|
||||
const (
|
||||
DimensionAggregation = "signoz.billing.aggregation"
|
||||
DimensionUnit = "signoz.billing.unit"
|
||||
DimensionOrganizationID = "signoz.billing.organization.id"
|
||||
DimensionRetentionDays = "signoz.billing.retention.days"
|
||||
)
|
||||
|
||||
// Reporter periodically collects meter values via the query service and ships
|
||||
// them to Zeus. Implementations must satisfy factory.ServiceWithHealthy so the
|
||||
// signoz registry can wait on startup and request graceful shutdown.
|
||||
type Reporter interface {
|
||||
factory.ServiceWithHealthy
|
||||
}
|
||||
39
pkg/meterreporter/noopmeterreporter/provider.go
Normal file
39
pkg/meterreporter/noopmeterreporter/provider.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package noopmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
healthyC chan struct{}
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
|
||||
}
|
||||
|
||||
func New(_ context.Context, _ factory.ProviderSettings, _ meterreporter.Config) (meterreporter.Reporter, error) {
|
||||
return &provider{
|
||||
healthyC: make(chan struct{}),
|
||||
stopC: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider) Start(_ context.Context) error {
|
||||
close(p.healthyC)
|
||||
<-p.stopC
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Stop(_ context.Context) error {
|
||||
close(p.stopC)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Healthy() <-chan struct{} {
|
||||
return p.healthyC
|
||||
}
|
||||
122
pkg/meterreporter/registry.go
Normal file
122
pkg/meterreporter/registry.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
)
|
||||
|
||||
// Exported names for every meter the reporter knows about. Refer to these
|
||||
// symbols — not string literals — everywhere so a typo becomes a compile error
|
||||
// instead of silently spawning a new (and unbilled) meter row at Zeus.
|
||||
var (
|
||||
MeterLogCount = meterreportertypes.MustNewName("signoz.meter.log.count")
|
||||
MeterLogSize = meterreportertypes.MustNewName("signoz.meter.log.size")
|
||||
MeterMetricDatapointCount = meterreportertypes.MustNewName("signoz.meter.metric.datapoint.count")
|
||||
MeterMetricDatapointSize = meterreportertypes.MustNewName("signoz.meter.metric.datapoint.size")
|
||||
MeterSpanCount = meterreportertypes.MustNewName("signoz.meter.span.count")
|
||||
MeterSpanSize = meterreportertypes.MustNewName("signoz.meter.span.size")
|
||||
)
|
||||
|
||||
const AggregationSum = "sum"
|
||||
|
||||
func baseMeters() []*Meter {
|
||||
meters := []*Meter{
|
||||
{
|
||||
Name: MeterLogCount,
|
||||
Unit: "count",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectLogCountMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterLogSize,
|
||||
Unit: "bytes",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectLogSizeMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterMetricDatapointCount,
|
||||
Unit: "count",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectMetricDatapointCountMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterMetricDatapointSize,
|
||||
Unit: "bytes",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectMetricDatapointSizeMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterSpanCount,
|
||||
Unit: "count",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectSpanCountMeter,
|
||||
},
|
||||
{
|
||||
Name: MeterSpanSize,
|
||||
Unit: "bytes",
|
||||
Aggregation: AggregationSum,
|
||||
Collect: CollectSpanSizeMeter,
|
||||
},
|
||||
}
|
||||
|
||||
mustValidateMeters(meters...)
|
||||
return meters
|
||||
}
|
||||
|
||||
// DefaultMeters is the hardcoded meter set shipped with the reporter. The
|
||||
// enterprise provider wires this into its collector loop at construction time;
|
||||
// the noop provider ignores it.
|
||||
func DefaultMeters() ([]Meter, error) {
|
||||
meters := baseMeters()
|
||||
if err := validateMeters(meters...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resolved := make([]Meter, 0, len(meters))
|
||||
for _, meter := range meters {
|
||||
resolved = append(resolved, *meter)
|
||||
}
|
||||
|
||||
return resolved, nil
|
||||
}
|
||||
|
||||
// validateMeters guards the registry: every meter must have all four fields
|
||||
// populated, and the (Name, Aggregation) pair must be unique — that pair is
|
||||
// the billing key on Zeus side, and a duplicate silently double-counts usage.
|
||||
func validateMeters(meters ...*Meter) error {
|
||||
seen := make(map[string]struct{}, len(meters))
|
||||
|
||||
for _, meter := range meters {
|
||||
if meter == nil {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "nil meter in registry")
|
||||
}
|
||||
if meter.Name.IsZero() {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter with empty name in registry")
|
||||
}
|
||||
if meter.Unit == "" {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no unit", meter.Name.String())
|
||||
}
|
||||
if meter.Aggregation == "" {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no aggregation", meter.Name.String())
|
||||
}
|
||||
if meter.Collect == nil {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "meter %q has no collector function", meter.Name.String())
|
||||
}
|
||||
|
||||
key := meter.Name.String() + "|" + meter.Aggregation
|
||||
if _, ok := seen[key]; ok {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidInput, "duplicate meter %q with aggregation %q", meter.Name.String(), meter.Aggregation)
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// mustValidateMeters is the boot-time variant used for hardcoded registrations.
|
||||
// A panic here is a programmer error — the meter list ships with the binary.
|
||||
func mustValidateMeters(meters ...*Meter) {
|
||||
if err := validateMeters(meters...); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
104
pkg/meterreporter/retention.go
Normal file
104
pkg/meterreporter/retention.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strconv"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type RetentionDomain string
|
||||
|
||||
const (
|
||||
RetentionDomainLogs RetentionDomain = "logs"
|
||||
RetentionDomainMetrics RetentionDomain = "metrics"
|
||||
RetentionDomainTraces RetentionDomain = "traces"
|
||||
)
|
||||
|
||||
// Fallback retention (in days) used when an org has no ttl_setting row. These
|
||||
// must mirror the DDL TTL on each domain's main ClickHouse table — any mismatch
|
||||
// means we'll bill customers for a retention they aren't actually getting.
|
||||
//
|
||||
// logs → signoz_logs.logs_v2 15d
|
||||
// metrics → signoz_metrics.samples_v4 30d (2_592_000s in the DDL)
|
||||
// traces → signoz_traces.signoz_index_v3 15d (1_296_000s in the DDL)
|
||||
var defaultRetentionDaysByDomain = map[RetentionDomain]int{
|
||||
RetentionDomainLogs: types.DefaultRetentionDays,
|
||||
RetentionDomainMetrics: 30,
|
||||
RetentionDomainTraces: 15,
|
||||
}
|
||||
|
||||
// resolveRetentionDays returns the per-org retention for a domain, ready to
|
||||
// stamp on a Reading as DimensionRetentionDays.
|
||||
//
|
||||
// Source of truth is the ttl_setting row written by the V2 retention path,
|
||||
// keyed on (org_id, local_table_name) and storing the value in days. When no
|
||||
// row exists — or the stored value is non-positive — we fall back to the DDL
|
||||
// default so the reading always ships with an accurate dimension.
|
||||
func resolveRetentionDays(ctx context.Context, sqlstore sqlstore.SQLStore, orgID valuer.UUID, domain RetentionDomain) (string, bool, error) {
|
||||
if sqlstore == nil {
|
||||
return "", false, nil
|
||||
}
|
||||
tableName, ok := retentionTableName(domain)
|
||||
if !ok {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
ttl := new(types.TTLSetting)
|
||||
err := sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(ttl).
|
||||
Where("table_name = ?", tableName).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
OrderExpr("created_at DESC").
|
||||
Limit(1).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return domainFallbackRetention(domain)
|
||||
}
|
||||
return "", false, errors.Wrapf(err, errors.TypeInternal, ErrCodeReportFailed, "load retention for domain %q", domain)
|
||||
}
|
||||
|
||||
if ttl.TTL <= 0 {
|
||||
return domainFallbackRetention(domain)
|
||||
}
|
||||
|
||||
return strconv.Itoa(ttl.TTL), true, nil
|
||||
}
|
||||
|
||||
// domainFallbackRetention is the fallback branch for resolveRetentionDays.
|
||||
// An unknown domain is a programming error — callers only ever pass the
|
||||
// exported RetentionDomain* constants.
|
||||
func domainFallbackRetention(domain RetentionDomain) (string, bool, error) {
|
||||
days, ok := defaultRetentionDaysByDomain[domain]
|
||||
if !ok {
|
||||
return "", false, errors.Newf(errors.TypeInternal, ErrCodeReportFailed, "no default retention defined for domain %q", domain)
|
||||
}
|
||||
return strconv.Itoa(days), true, nil
|
||||
}
|
||||
|
||||
// retentionTableName returns the local (non-distributed) ClickHouse table name
|
||||
// used as the ttl_setting key for each domain. This must match exactly what
|
||||
// SetCustomRetentionV2 writes — if the V2 writer ever changes the key, update
|
||||
// this switch in the same change.
|
||||
func retentionTableName(domain RetentionDomain) (string, bool) {
|
||||
switch domain {
|
||||
case RetentionDomainLogs:
|
||||
return telemetrylogs.DBName + "." + telemetrylogs.LogsV2LocalTableName, true
|
||||
case RetentionDomainMetrics:
|
||||
return telemetrymetrics.DBName + "." + telemetrymetrics.SamplesV4LocalTableName, true
|
||||
case RetentionDomainTraces:
|
||||
return telemetrytraces.DBName + "." + telemetrytraces.SpanIndexV3LocalTableName, true
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
@@ -2008,7 +2008,7 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
// No V2 configuration found, return defaults
|
||||
response.DefaultTTLDays = 15
|
||||
response.DefaultTTLDays = types.DefaultRetentionDays
|
||||
response.TTLConditions = []model.CustomRetentionRule{}
|
||||
response.Status = constants.StatusSuccess
|
||||
response.ColdStorageTTLDays = -1
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/identn"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
|
||||
@@ -129,6 +130,9 @@ type Config struct {
|
||||
// Auditor config
|
||||
Auditor auditor.Config `mapstructure:"auditor"`
|
||||
|
||||
// MeterReporter config
|
||||
MeterReporter meterreporter.Config `mapstructure:"meterreporter"`
|
||||
|
||||
// CloudIntegration config
|
||||
CloudIntegration cloudintegration.Config `mapstructure:"cloudintegration"`
|
||||
}
|
||||
@@ -162,6 +166,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
|
||||
identn.NewConfigFactory(),
|
||||
serviceaccount.NewConfigFactory(),
|
||||
auditor.NewConfigFactory(),
|
||||
meterreporter.NewConfigFactory(),
|
||||
cloudintegration.NewConfigFactory(),
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/identn/apikeyidentn"
|
||||
"github.com/SigNoz/signoz/pkg/identn/impersonationidentn"
|
||||
"github.com/SigNoz/signoz/pkg/identn/tokenizeridentn"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter/noopmeterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
@@ -315,6 +317,12 @@ func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[audi
|
||||
)
|
||||
}
|
||||
|
||||
func NewMeterReporterProviderFactories() factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
noopmeterreporter.NewFactory(),
|
||||
)
|
||||
}
|
||||
|
||||
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
configflagger.NewFactory(registry),
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/identn"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -84,6 +85,7 @@ type SigNoz struct {
|
||||
Flagger flagger.Flagger
|
||||
Gateway gateway.Gateway
|
||||
Auditor auditor.Auditor
|
||||
MeterReporter meterreporter.Reporter
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -104,6 +106,7 @@ func New(
|
||||
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
|
||||
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
|
||||
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
|
||||
meterReporterProviderFactories func(licensing.Licensing, telemetrystore.TelemetryStore, sqlstore.SQLStore, organization.Getter, zeus.Zeus) factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]],
|
||||
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
|
||||
cloudIntegrationCallback func(sqlstore.SQLStore, global.Global, zeus.Zeus, gateway.Gateway, licensing.Licensing, serviceaccount.Module, cloudintegration.Config) (cloudintegration.Module, error),
|
||||
rulerProviderFactories func(cache.Cache, alertmanager.Alertmanager, sqlstore.SQLStore, telemetrystore.TelemetryStore, telemetrytypes.MetadataStore, prometheus.Prometheus, organization.Getter, rulestatehistory.Module, querier.Querier, queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]],
|
||||
@@ -377,6 +380,12 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize meter reporter from the variant-specific provider factories
|
||||
meterReporter, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.MeterReporter, meterReporterProviderFactories(licensing, telemetrystore, sqlstore, orgGetter, zeus), config.MeterReporter.Provider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize authns
|
||||
store := sqlauthnstore.NewStore(sqlstore)
|
||||
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
|
||||
@@ -491,6 +500,7 @@ func New(
|
||||
factory.NewNamedService(factory.MustNewName("authz"), authz),
|
||||
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
|
||||
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
|
||||
factory.NewNamedService(factory.MustNewName("meterreporter"), meterReporter, factory.MustNewName("licensing")),
|
||||
factory.NewNamedService(factory.MustNewName("ruler"), rulerInstance),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -540,5 +550,6 @@ func New(
|
||||
Flagger: flagger,
|
||||
Gateway: gateway,
|
||||
Auditor: auditor,
|
||||
MeterReporter: meterReporter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
41
pkg/types/meterreportertypes/name.go
Normal file
41
pkg/types/meterreportertypes/name.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package meterreportertypes
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
var nameRegex = regexp.MustCompile(`^[a-z][a-z0-9_.]+$`)
|
||||
|
||||
// Name is a concrete type for a meter name. Dotted namespace identifiers like
|
||||
// "signoz.meter.log.count" are permitted; arbitrary strings are not, to avoid
|
||||
// typos silently producing distinct meter rows at Zeus.
|
||||
type Name struct {
|
||||
s string
|
||||
}
|
||||
|
||||
func NewName(s string) (Name, error) {
|
||||
if !nameRegex.MatchString(s) {
|
||||
return Name{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid meter name: %s", s)
|
||||
}
|
||||
|
||||
return Name{s: s}, nil
|
||||
}
|
||||
|
||||
func MustNewName(s string) Name {
|
||||
name, err := NewName(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return name
|
||||
}
|
||||
|
||||
func (n Name) String() string {
|
||||
return n.s
|
||||
}
|
||||
|
||||
func (n Name) IsZero() bool {
|
||||
return n.s == ""
|
||||
}
|
||||
39
pkg/types/meterreportertypes/types.go
Normal file
39
pkg/types/meterreportertypes/types.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package meterreportertypes
|
||||
|
||||
// Reading is a single meter value sent to Zeus. Zeus UPSERTs on
|
||||
// (license_key, dimension_hash, startUnixMilli), so repeated readings within
|
||||
// the same tick window safely overwrite prior values.
|
||||
type Reading struct {
|
||||
// MeterName is the fully-qualified meter identifier.
|
||||
MeterName string `json:"meterName"`
|
||||
|
||||
// Value is the aggregated scalar for this (meter, aggregation) pair over the reporting window.
|
||||
Value float64 `json:"value"`
|
||||
|
||||
// StartUnixMilli is the inclusive lower bound of the reporting window in
|
||||
// epoch milliseconds (UTC day start for both sealed and partial readings).
|
||||
StartUnixMilli int64 `json:"startUnixMilli"`
|
||||
|
||||
// EndUnixMilli is the exclusive upper bound of the reporting window in
|
||||
// epoch milliseconds. For a sealed day it is the next day's 00:00 UTC; for
|
||||
// the intra-day partial it is the tick's now() — hence each tick's partial
|
||||
// carries a fresh EndUnixMilli while the idempotency key keeps it upserted.
|
||||
EndUnixMilli int64 `json:"endUnixMilli"`
|
||||
|
||||
// IsCompleted is true only for sealed past buckets. In-progress buckets
|
||||
// (e.g. the current UTC day) report IsCompleted=false so Zeus knows the value may still change.
|
||||
IsCompleted bool `json:"isCompleted"`
|
||||
|
||||
// Dimensions is the per-reading label set.
|
||||
Dimensions map[string]string `json:"dimensions"`
|
||||
}
|
||||
|
||||
// PostableMeterReadings is the request body for Zeus.PutMeterReadings.
|
||||
type PostableMeterReadings struct { // ! Needs fix once zeus contract is setup
|
||||
// IdempotencyKey is echoed as the X-Idempotency-Key header and stored by
|
||||
// Zeus so retries within the same tick window overwrite rather than duplicate.
|
||||
IdempotencyKey string `json:"idempotencyKey"`
|
||||
|
||||
// Readings is the batch of meter values being shipped.
|
||||
Readings []Reading `json:"readings"`
|
||||
}
|
||||
@@ -73,6 +73,11 @@ func NewTraitsFromOrganization(org *Organization) map[string]any {
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultRetentionDays is the retention shown in the UI when no explicit
|
||||
// policy has been configured for an org. Both the API reader and the meter
|
||||
// reporter use this value so a change here propagates everywhere.
|
||||
const DefaultRetentionDays = 15
|
||||
|
||||
type TTLSetting struct {
|
||||
bun.BaseModel `bun:"table:ttl_setting"`
|
||||
Identifiable
|
||||
|
||||
@@ -2,6 +2,7 @@ package noopzeus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
@@ -49,6 +50,14 @@ func (provider *provider) PutMetersV2(_ context.Context, _ string, _ []byte) err
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v2 is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) PutMeterReadings(_ context.Context, _ string, _ string, _ []byte) error {
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meter readings is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) LatestSealed(_ context.Context, _ string) (*time.Time, error) {
|
||||
return nil, errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "fetching latest sealed day is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) PutProfile(_ context.Context, _ string, _ *zeustypes.PostableProfile) error {
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package zeus
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/zeustypes"
|
||||
@@ -35,6 +36,16 @@ type Zeus interface {
|
||||
// Puts the meters for the given license key using Zeus.
|
||||
PutMetersV2(context.Context, string, []byte) error
|
||||
|
||||
// PutMeterReadings ships TDD-shape meter readings to the v2/meters
|
||||
// endpoint. idempotencyKey is propagated as X-Idempotency-Key so Zeus can UPSERT on retries.
|
||||
PutMeterReadings(ctx context.Context, licenseKey string, idempotencyKey string, body []byte) error
|
||||
|
||||
// LatestSealed returns the latest UTC day for which any billing meter has
|
||||
// a sealed (is_completed=true) reading for the license. A nil return means
|
||||
// no sealed rows exist yet (bootstrap case). The cron uses this as a
|
||||
// checkpoint to forward-fill sealed windows without tracking local state.
|
||||
LatestSealed(ctx context.Context, licenseKey string) (*time.Time, error)
|
||||
|
||||
// Put profile for the given license key.
|
||||
PutProfile(context.Context, string, *zeustypes.PostableProfile) error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user