Compare commits

...

8 Commits

2 changed files with 75 additions and 6 deletions

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"math/rand/v2"
"time"
"github.com/SigNoz/signoz/pkg/errors"
@@ -94,21 +95,33 @@ func newProvider(
func (provider *Provider) Start(ctx context.Context) error {
close(provider.healthyC)
provider.collect(ctx)
startDelay := jitter(provider.config.ResolvedMaxStartJitter())
provider.settings.Logger().InfoContext(ctx, "scheduling first meter collect", slog.String("delay", startDelay.String()), slog.Int64("delay_ns", startDelay.Nanoseconds()))
ticker := time.NewTicker(provider.config.Interval)
defer ticker.Stop()
timer := time.NewTimer(startDelay)
defer timer.Stop()
for {
select {
case <-provider.stopC:
return nil
case <-ticker.C:
case <-timer.C:
provider.collect(ctx)
next := provider.config.Interval - jitter(provider.config.ResolvedMaxTickJitter())
timer.Reset(next)
provider.settings.Logger().InfoContext(ctx, "scheduled next meter collect", slog.String("delay", next.String()), slog.Int64("delay_ns", next.Nanoseconds()))
}
}
}
// jitter returns a uniform random duration in [0, max). Returns 0 if max <= 0.
func jitter(max time.Duration) time.Duration {
if max <= 0 {
return 0
}
return time.Duration(rand.Int64N(int64(max)))
}
func (provider *Provider) collect(ctx context.Context) {
ctx, span := provider.settings.Tracer().Start(ctx, "meterreporter.Collect", trace.WithAttributes(attribute.String("meterreporter.provider", "http")))
defer span.End()
@@ -147,6 +160,7 @@ func (provider *Provider) collect(ctx context.Context) {
}
func (provider *Provider) Stop(ctx context.Context) error {
provider.settings.Logger().InfoContext(ctx, "stopping meter reporter")
close(provider.stopC)
return nil
}
@@ -170,6 +184,11 @@ func (provider *Provider) collectOrg(ctx context.Context, org *types.Organizatio
start, end, ok := backfillRange(nextByCollector, todayStart)
if ok {
provider.settings.Logger().InfoContext(ctx, "backfilling sealed days",
slog.String("org_id", org.ID.StringValue()),
slog.String("start", start.Format("2006-01-02")),
slog.String("end", end.Format("2006-01-02")))
for day := start; !day.After(end); day = day.AddDate(0, 0, 1) {
eligible := eligibleCollectors(provider.collectorsByName, nextByCollector, day)
if len(eligible) == 0 {
@@ -257,6 +276,10 @@ func (provider *Provider) report(ctx context.Context, orgID valuer.UUID, license
collectedReadings, err := collector.Collect(ctx, orgID, license, window)
if err != nil {
provider.metrics.collections.Add(ctx, 1, metric.WithAttributes(meterAttr, errors.TypeAttr(err)))
provider.settings.Logger().ErrorContext(ctx, "meter collector failed",
errors.Attr(err),
slog.String("org_id", orgID.StringValue()),
slog.String("meter", collector.Name().String()))
continue
}
@@ -283,6 +306,10 @@ func (provider *Provider) report(ctx context.Context, orgID valuer.UUID, license
provider.metrics.reports.Add(ctx, 1)
provider.metrics.meters.Add(ctx, int64(len(meters)))
provider.settings.Logger().InfoContext(ctx, "reported meters to zeus",
slog.String("org_id", orgID.StringValue()),
slog.String("date", date),
slog.Int("meters", len(meters)))
return nil
}

View File

@@ -15,12 +15,27 @@ type Config struct {
// Backfill enables sealed-day catch-up from the license creation day.
Backfill bool `mapstructure:"backfill"`
// MaxStartJitter caps the random delay before the first collect after
// Start(). Actual delay is uniform in [0, MaxStartJitter). Negative
// (the default) means "derive from Interval" — see ResolvedMaxStartJitter.
// Set to 0 explicitly to collect immediately on startup (dev / testing).
MaxStartJitter time.Duration `mapstructure:"max_start_jitter"`
// MaxTickJitter is the maximum amount shaved off Interval for each
// subsequent fire. Each cycle = uniform(Interval - MaxTickJitter, Interval].
// Negative (the default) means "derive from Interval" — see
// ResolvedMaxTickJitter. Must be < Interval so two consecutive fires
// cannot collapse to the same instant.
MaxTickJitter time.Duration `mapstructure:"max_tick_jitter"`
}
func newConfig() factory.Config {
return Config{
Interval: 6 * time.Hour,
Backfill: true,
Interval: 6 * time.Hour,
Backfill: true,
MaxStartJitter: -1, // Negative sentinels. Resolved at use time unless explicitly set.
MaxTickJitter: -1, // Negative sentinels. Resolved at use time unless explicitly set.
}
}
@@ -33,5 +48,32 @@ func (c Config) Validate() error {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be between 5m and 24h")
}
if c.MaxStartJitter > c.Interval {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::max_start_jitter must be between 0 and interval")
}
if c.MaxTickJitter >= c.Interval {
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::max_tick_jitter must be in [0, interval)")
}
return nil
}
// ResolvedMaxStartJitter returns the configured MaxStartJitter or, if the
// sentinel default is still in place, falls back to Interval so that the
// jitter scales with whatever Interval the user picks.
func (c Config) ResolvedMaxStartJitter() time.Duration {
if c.MaxStartJitter < 0 {
return c.Interval
}
return c.MaxStartJitter
}
// ResolvedMaxTickJitter returns the configured MaxTickJitter or, if the
// sentinel default is still in place, falls back to Interval / 10.
func (c Config) ResolvedMaxTickJitter() time.Duration {
if c.MaxTickJitter < 0 {
return c.Interval / 10
}
return c.MaxTickJitter
}