mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-05 09:50:31 +01:00
Compare commits
42 Commits
fix/cursor
...
feat/billi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3616022049 | ||
|
|
5d270b716b | ||
|
|
19d9d26051 | ||
|
|
522148362b | ||
|
|
781158ecab | ||
|
|
0603bd6b27 | ||
|
|
37c57e6c05 | ||
|
|
12a2e63a31 | ||
|
|
453bcc06c4 | ||
|
|
fac5fe6b9e | ||
|
|
0ad412b844 | ||
|
|
d1957b5eac | ||
|
|
dba9cfd455 | ||
|
|
ed2011a7bb | ||
|
|
68385478c7 | ||
|
|
eb661b7ac7 | ||
|
|
afd6868423 | ||
|
|
8ddf0a13c1 | ||
|
|
16f0d2aa38 | ||
|
|
3af912c586 | ||
|
|
ad7715802b | ||
|
|
b579bdbd7b | ||
|
|
aa64cf7bbf | ||
|
|
2d33b1a743 | ||
|
|
4fbf7de8e1 | ||
|
|
7528b19fd4 | ||
|
|
42e4196aad | ||
|
|
22cdb03702 | ||
|
|
6eca3dc06e | ||
|
|
0631189417 | ||
|
|
ec552b94cc | ||
|
|
ee8d99f1d0 | ||
|
|
bf77e26a86 | ||
|
|
9cd3cf23d7 | ||
|
|
4a44802ebc | ||
|
|
f2aed0d834 | ||
|
|
527d8c0459 | ||
|
|
8fdc91260e | ||
|
|
218c4524b1 | ||
|
|
02dec846eb | ||
|
|
99dadb7247 | ||
|
|
44b41c40de |
@@ -18,11 +18,13 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/gateway"
|
||||
"github.com/SigNoz/signoz/pkg/gateway/noopgateway"
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
@@ -109,6 +111,9 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
func(_ licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]] {
|
||||
return signoz.NewAuditorProviderFactories()
|
||||
},
|
||||
func(_ context.Context, _ flagger.Flagger, _ licensing.Licensing, _ telemetrystore.TelemetryStore, _ sqlstore.SQLStore, _ organization.Getter, _ zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string) {
|
||||
return signoz.NewMeterReporterProviderFactories(), "noop"
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
return querier.NewHandler(ps, q, a)
|
||||
},
|
||||
|
||||
@@ -17,6 +17,14 @@ import (
|
||||
"github.com/SigNoz/signoz/ee/gateway/httpgateway"
|
||||
enterpriselicensing "github.com/SigNoz/signoz/ee/licensing"
|
||||
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
|
||||
"github.com/SigNoz/signoz/ee/metercollector/baseplatformfeemetercollector"
|
||||
"github.com/SigNoz/signoz/ee/metercollector/datapointcountmetercollector"
|
||||
"github.com/SigNoz/signoz/ee/metercollector/datapointsizemetercollector"
|
||||
"github.com/SigNoz/signoz/ee/metercollector/logcountmetercollector"
|
||||
"github.com/SigNoz/signoz/ee/metercollector/logsizemetercollector"
|
||||
"github.com/SigNoz/signoz/ee/metercollector/spancountmetercollector"
|
||||
"github.com/SigNoz/signoz/ee/metercollector/spansizemetercollector"
|
||||
"github.com/SigNoz/signoz/ee/meterreporter/httpmeterreporter"
|
||||
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/ee/modules/cloudintegration/implcloudintegration/implcloudprovider"
|
||||
"github.com/SigNoz/signoz/ee/modules/dashboard/impldashboard"
|
||||
@@ -35,9 +43,12 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
pkgflagger "github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/gateway"
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
pkgcloudintegration "github.com/SigNoz/signoz/pkg/modules/cloudintegration/implcloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
@@ -57,7 +68,10 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/cloudintegrationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/featuretypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/SigNoz/signoz/pkg/version"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
)
|
||||
@@ -157,6 +171,19 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
}
|
||||
return factories
|
||||
},
|
||||
func(ctx context.Context, flagger pkgflagger.Flagger, licensing licensing.Licensing, telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore, orgGetter organization.Getter, zeus zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string) {
|
||||
factories := signoz.NewMeterReporterProviderFactories()
|
||||
if err := factories.Add(httpmeterreporter.NewFactory(newMeterCollectors(licensing, telemetryStore, sqlStore), licensing, telemetryStore, orgGetter, zeus)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
evalCtx := featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})
|
||||
if flagger.BooleanOrEmpty(ctx, pkgflagger.FeatureUseMeterReporter, evalCtx) {
|
||||
return factories, "http"
|
||||
}
|
||||
|
||||
return factories, "noop"
|
||||
},
|
||||
func(ps factory.ProviderSettings, q querier.Querier, a analytics.Analytics) querier.Handler {
|
||||
communityHandler := querier.NewHandler(ps, q, a)
|
||||
return eequerier.NewHandler(ps, q, communityHandler)
|
||||
@@ -216,3 +243,15 @@ func runServer(ctx context.Context, config signoz.Config, logger *slog.Logger) e
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newMeterCollectors(licensing licensing.Licensing, telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) map[metercollectortypes.Name]metercollector.MeterCollector {
|
||||
return map[metercollectortypes.Name]metercollector.MeterCollector{
|
||||
baseplatformfeemetercollector.MeterName: baseplatformfeemetercollector.New(licensing),
|
||||
logcountmetercollector.MeterName: logcountmetercollector.New(telemetryStore, sqlStore),
|
||||
logsizemetercollector.MeterName: logsizemetercollector.New(telemetryStore, sqlStore),
|
||||
datapointcountmetercollector.MeterName: datapointcountmetercollector.New(telemetryStore, sqlStore),
|
||||
datapointsizemetercollector.MeterName: datapointsizemetercollector.New(telemetryStore, sqlStore),
|
||||
spancountmetercollector.MeterName: spancountmetercollector.New(telemetryStore, sqlStore),
|
||||
spansizemetercollector.MeterName: spansizemetercollector.New(telemetryStore, sqlStore),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -429,3 +429,10 @@ authz:
|
||||
openfga:
|
||||
# maximum tuples allowed per openfga write operation.
|
||||
max_tuples_per_write: 100
|
||||
|
||||
##################### Meter Reporter #####################
|
||||
meterreporter:
|
||||
# The interval between collection ticks. Minimum 5m.
|
||||
interval: 6h
|
||||
# The per-tick timeout that bounds collect-and-ship work. Minimum 3m and must be less than interval.
|
||||
timeout: 5m
|
||||
|
||||
58
ee/metercollector/baseplatformfeemetercollector/provider.go
Normal file
58
ee/metercollector/baseplatformfeemetercollector/provider.go
Normal file
@@ -0,0 +1,58 @@
|
||||
// Package baseplatformfeemetercollector collects the license-derived base platform fee meter.
|
||||
package baseplatformfeemetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MeterName is the typed registry key for this collector.
|
||||
var (
|
||||
MeterName = metercollectortypes.MustNewName("signoz.meter.base.platform.fee")
|
||||
meterUnit = metercollectortypes.UnitCount
|
||||
meterAggregation = metercollectortypes.AggregationMax
|
||||
)
|
||||
|
||||
var _ metercollector.MeterCollector = (*Provider)(nil)
|
||||
|
||||
// Provider collects base platform fee meters.
|
||||
type Provider struct {
|
||||
licensing licensing.Licensing
|
||||
}
|
||||
|
||||
func New(licensing licensing.Licensing) *Provider {
|
||||
return &Provider{licensing: licensing}
|
||||
}
|
||||
|
||||
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
|
||||
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
|
||||
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
|
||||
return meterAggregation
|
||||
}
|
||||
|
||||
// Collect emits value 1 when the org has an active license.
|
||||
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
|
||||
if !window.IsValid() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
|
||||
}
|
||||
|
||||
license, err := p.licensing.GetActive(ctx, orgID)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "fetch active license for base platform fee meter")
|
||||
}
|
||||
if license == nil || license.Key == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return []meterreportertypes.Meter{
|
||||
meterreportertypes.NewMeter(MeterName, 1, meterUnit, meterAggregation, window, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
107
ee/metercollector/baseplatformfeemetercollector/provider_test.go
Normal file
107
ee/metercollector/baseplatformfeemetercollector/provider_test.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package baseplatformfeemetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/licensetypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCollectEmitsBasePlatformFeeMeterForValidLicense(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
window := completedWindow()
|
||||
provider := New(&fakeLicensing{
|
||||
license: &licensetypes.License{Key: "license-key"},
|
||||
})
|
||||
|
||||
readings, err := provider.Collect(context.Background(), orgID, window)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []meterreportertypes.Meter{
|
||||
meterreportertypes.NewMeter(MeterName, 1, metercollectortypes.UnitCount, metercollectortypes.AggregationMax, window, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
}),
|
||||
}, readings)
|
||||
}
|
||||
|
||||
func TestCollectSkipsNilLicense(t *testing.T) {
|
||||
readings, err := New(&fakeLicensing{}).Collect(context.Background(), valuer.GenerateUUID(), completedWindow())
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, readings)
|
||||
}
|
||||
|
||||
func TestProviderMetadata(t *testing.T) {
|
||||
provider := New(&fakeLicensing{})
|
||||
|
||||
require.Equal(t, "signoz.meter.base.platform.fee", provider.Name().String())
|
||||
require.Equal(t, metercollectortypes.UnitCount, provider.Unit())
|
||||
require.Equal(t, metercollectortypes.AggregationMax, provider.Aggregation())
|
||||
}
|
||||
|
||||
func TestCollectRejectsInvalidWindowBeforeLicensing(t *testing.T) {
|
||||
readings, err := New(nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
|
||||
require.Error(t, err)
|
||||
require.Nil(t, readings)
|
||||
}
|
||||
|
||||
func completedWindow() meterreportertypes.Window {
|
||||
start := time.Date(2026, 5, 4, 0, 0, 0, 0, time.UTC)
|
||||
return meterreportertypes.Window{
|
||||
StartUnixMilli: start.UnixMilli(),
|
||||
EndUnixMilli: start.AddDate(0, 0, 1).UnixMilli(),
|
||||
IsCompleted: true,
|
||||
}
|
||||
}
|
||||
|
||||
var _ licensing.Licensing = (*fakeLicensing)(nil)
|
||||
|
||||
type fakeLicensing struct {
|
||||
license *licensetypes.License
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) Start(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) Stop(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) Validate(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) Activate(context.Context, valuer.UUID, string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) GetActive(context.Context, valuer.UUID) (*licensetypes.License, error) {
|
||||
return f.license, f.err
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) Refresh(context.Context, valuer.UUID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) Checkout(context.Context, valuer.UUID, *licensetypes.PostableSubscription) (*licensetypes.GettableSubscription, error) {
|
||||
return &licensetypes.GettableSubscription{}, nil
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) Portal(context.Context, valuer.UUID, *licensetypes.PostableSubscription) (*licensetypes.GettableSubscription, error) {
|
||||
return &licensetypes.GettableSubscription{}, nil
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) GetFeatureFlags(context.Context, valuer.UUID) ([]*licensetypes.Feature, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeLicensing) Collect(context.Context, valuer.UUID) (map[string]any, error) {
|
||||
return map[string]any{}, nil
|
||||
}
|
||||
276
ee/metercollector/datapointcountmetercollector/provider.go
Normal file
276
ee/metercollector/datapointcountmetercollector/provider.go
Normal file
@@ -0,0 +1,276 @@
|
||||
// Package datapointcountmetercollector collects metric datapoint count meters
|
||||
// by workspace and retention. Keep the query local to this meter.
|
||||
package datapointcountmetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
|
||||
"github.com/SigNoz/signoz/ee/metercollector/retention"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MeterName is the typed registry key for this collector.
|
||||
var (
|
||||
MeterName = metercollectortypes.MustNewName("signoz.meter.metric.datapoint.count")
|
||||
meterUnit = metercollectortypes.UnitCount
|
||||
meterAggregation = metercollectortypes.AggregationSum
|
||||
)
|
||||
|
||||
var _ metercollector.MeterCollector = (*Provider)(nil)
|
||||
|
||||
// Provider collects datapoint count meters.
|
||||
type Provider struct {
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
sqlStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
|
||||
return &Provider{
|
||||
telemetryStore: telemetryStore,
|
||||
sqlStore: sqlStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
|
||||
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
|
||||
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
|
||||
return meterAggregation
|
||||
}
|
||||
|
||||
// Collect aggregates datapoint count for the window and emits an empty-day sentinel.
|
||||
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
|
||||
if !window.IsValid() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
|
||||
}
|
||||
|
||||
meterName := MeterName.String()
|
||||
|
||||
slices, err := retention.LoadActiveSlices(
|
||||
ctx,
|
||||
p.sqlStore,
|
||||
orgID,
|
||||
telemetrymetrics.DBName+"."+telemetrymetrics.SamplesV4LocalTableName,
|
||||
retentiontypes.DefaultMetricsRetentionDays,
|
||||
window.StartUnixMilli, window.EndUnixMilli,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
dimensions map[string]string
|
||||
value float64
|
||||
}
|
||||
accumulator := make(map[string]*bucket)
|
||||
|
||||
for _, slice := range slices {
|
||||
query, args, dimensionColumns, err := buildQuery(meterName, slice)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
|
||||
}
|
||||
|
||||
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
dimensionValues := make([]string, len(dimensionColumns))
|
||||
var retentionDays int32
|
||||
var retentionRuleIndex int32
|
||||
var value float64
|
||||
|
||||
scanDest := make([]any, 0, len(dimensionValues)+3)
|
||||
for i := range dimensionValues {
|
||||
scanDest = append(scanDest, &dimensionValues[i])
|
||||
}
|
||||
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
|
||||
|
||||
if err := rows.Scan(scanDest...); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
key := bucketKey(dimensions)
|
||||
b, ok := accumulator[key]
|
||||
if !ok {
|
||||
b = &bucket{dimensions: dimensions}
|
||||
accumulator[key] = b
|
||||
}
|
||||
b.value += value
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
|
||||
for _, b := range accumulator {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
|
||||
}
|
||||
|
||||
// Empty windows still emit a sentinel so checkpoints can advance.
|
||||
if len(meters) == 0 && len(slices) > 0 {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
|
||||
}))
|
||||
}
|
||||
|
||||
return meters, nil
|
||||
}
|
||||
|
||||
// buildQuery stays local because each meter owns its billing query.
|
||||
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
|
||||
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
columns, err := dimensionColumnsFor(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
selects := make([]string, 0, len(columns)+3)
|
||||
groupBy := make([]string, 0, len(columns)+2)
|
||||
for _, column := range columns {
|
||||
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
|
||||
groupBy = append(groupBy, column.alias)
|
||||
}
|
||||
selects = append(selects,
|
||||
retentionExpr+" AS retention_days",
|
||||
retentionRuleIndexExpr+" AS retention_rule_index",
|
||||
"ifNull(sum(value), 0) AS value",
|
||||
)
|
||||
groupBy = append(groupBy, "retention_days", "retention_rule_index")
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(selects...)
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", meterName),
|
||||
sb.GTE("unix_milli", slice.StartMs),
|
||||
sb.LT("unix_milli", slice.EndMs),
|
||||
)
|
||||
sb.GroupBy(groupBy...)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return query, args, columns, nil
|
||||
}
|
||||
|
||||
type dimensionColumn struct {
|
||||
key string
|
||||
alias string
|
||||
}
|
||||
|
||||
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
|
||||
dimensionKeys, err := retention.RuleDimensionKeys(rules)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys := make([]string, 0, len(dimensionKeys)+1)
|
||||
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
|
||||
for _, key := range dimensionKeys {
|
||||
if key == metercollector.DimensionWorkspaceKeyID {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
columns := make([]dimensionColumn, len(keys))
|
||||
for i, key := range keys {
|
||||
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
|
||||
}
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func buildDimensions(
|
||||
orgID valuer.UUID,
|
||||
retentionDays int,
|
||||
retentionRuleIndex int,
|
||||
columns []dimensionColumn,
|
||||
values []string,
|
||||
rules []retentiontypes.CustomRetentionRule,
|
||||
) (map[string]string, error) {
|
||||
if len(columns) != len(values) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
|
||||
}
|
||||
|
||||
valuesByKey := make(map[string]string, len(columns))
|
||||
for i, column := range columns {
|
||||
valuesByKey[column.key] = values[i]
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
|
||||
}
|
||||
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
|
||||
|
||||
if retentionRuleIndex < 0 {
|
||||
return dimensions, nil
|
||||
}
|
||||
if retentionRuleIndex >= len(rules) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
|
||||
}
|
||||
for _, filter := range rules[retentionRuleIndex].Filters {
|
||||
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
|
||||
}
|
||||
return dimensions, nil
|
||||
}
|
||||
|
||||
func addNonEmpty(dimensions map[string]string, key, value string) {
|
||||
if value == "" {
|
||||
return
|
||||
}
|
||||
dimensions[key] = value
|
||||
}
|
||||
|
||||
func bucketKey(dimensions map[string]string) string {
|
||||
keys := make([]string, 0, len(dimensions))
|
||||
for key := range dimensions {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
var b strings.Builder
|
||||
for _, key := range keys {
|
||||
value := dimensions[key]
|
||||
b.WriteString(strconv.Itoa(len(key)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(key)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.Itoa(len(value)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(value)
|
||||
b.WriteByte(';')
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package datapointcountmetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildDimensions(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
rules := []retentiontypes.CustomRetentionRule{{
|
||||
Filters: []retentiontypes.FilterCondition{{
|
||||
Key: "service.name",
|
||||
Values: []string{"api"},
|
||||
}},
|
||||
TTLDays: 7,
|
||||
}}
|
||||
columns := []dimensionColumn{
|
||||
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
|
||||
{key: "service.name", alias: "dim_1"},
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
}, dimensions)
|
||||
}
|
||||
|
||||
func TestProviderMetadata(t *testing.T) {
|
||||
provider := New(nil, nil)
|
||||
|
||||
require.Equal(t, "signoz.meter.metric.datapoint.count", provider.Name().String())
|
||||
require.Equal(t, metercollectortypes.UnitCount, provider.Unit())
|
||||
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
|
||||
}
|
||||
|
||||
func TestBucketKeyIsStable(t *testing.T) {
|
||||
first := bucketKey(map[string]string{
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
})
|
||||
second := bucketKey(map[string]string{
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
})
|
||||
|
||||
require.Equal(t, first, second)
|
||||
require.NotEmpty(t, first)
|
||||
}
|
||||
|
||||
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
|
||||
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
|
||||
require.Error(t, err)
|
||||
require.Nil(t, readings)
|
||||
}
|
||||
276
ee/metercollector/datapointsizemetercollector/provider.go
Normal file
276
ee/metercollector/datapointsizemetercollector/provider.go
Normal file
@@ -0,0 +1,276 @@
|
||||
// Package datapointsizemetercollector collects metric datapoint size meters
|
||||
// by workspace and retention. Keep the query local to this meter.
|
||||
package datapointsizemetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
|
||||
"github.com/SigNoz/signoz/ee/metercollector/retention"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MeterName is the typed registry key for this collector.
|
||||
var (
|
||||
MeterName = metercollectortypes.MustNewName("signoz.meter.metric.datapoint.size")
|
||||
meterUnit = metercollectortypes.UnitBytes
|
||||
meterAggregation = metercollectortypes.AggregationSum
|
||||
)
|
||||
|
||||
var _ metercollector.MeterCollector = (*Provider)(nil)
|
||||
|
||||
// Provider collects datapoint size meters.
|
||||
type Provider struct {
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
sqlStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
|
||||
return &Provider{
|
||||
telemetryStore: telemetryStore,
|
||||
sqlStore: sqlStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
|
||||
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
|
||||
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
|
||||
return meterAggregation
|
||||
}
|
||||
|
||||
// Collect aggregates datapoint size for the window and emits an empty-day sentinel.
|
||||
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
|
||||
if !window.IsValid() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
|
||||
}
|
||||
|
||||
meterName := MeterName.String()
|
||||
|
||||
slices, err := retention.LoadActiveSlices(
|
||||
ctx,
|
||||
p.sqlStore,
|
||||
orgID,
|
||||
telemetrymetrics.DBName+"."+telemetrymetrics.SamplesV4LocalTableName,
|
||||
retentiontypes.DefaultMetricsRetentionDays,
|
||||
window.StartUnixMilli, window.EndUnixMilli,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
dimensions map[string]string
|
||||
value float64
|
||||
}
|
||||
accumulator := make(map[string]*bucket)
|
||||
|
||||
for _, slice := range slices {
|
||||
query, args, dimensionColumns, err := buildQuery(meterName, slice)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
|
||||
}
|
||||
|
||||
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
dimensionValues := make([]string, len(dimensionColumns))
|
||||
var retentionDays int32
|
||||
var retentionRuleIndex int32
|
||||
var value float64
|
||||
|
||||
scanDest := make([]any, 0, len(dimensionValues)+3)
|
||||
for i := range dimensionValues {
|
||||
scanDest = append(scanDest, &dimensionValues[i])
|
||||
}
|
||||
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
|
||||
|
||||
if err := rows.Scan(scanDest...); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
key := bucketKey(dimensions)
|
||||
b, ok := accumulator[key]
|
||||
if !ok {
|
||||
b = &bucket{dimensions: dimensions}
|
||||
accumulator[key] = b
|
||||
}
|
||||
b.value += value
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
|
||||
for _, b := range accumulator {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
|
||||
}
|
||||
|
||||
// Empty windows still emit a sentinel so checkpoints can advance.
|
||||
if len(meters) == 0 && len(slices) > 0 {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
|
||||
}))
|
||||
}
|
||||
|
||||
return meters, nil
|
||||
}
|
||||
|
||||
// buildQuery stays local because each meter owns its billing query.
|
||||
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
|
||||
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
columns, err := dimensionColumnsFor(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
selects := make([]string, 0, len(columns)+3)
|
||||
groupBy := make([]string, 0, len(columns)+2)
|
||||
for _, column := range columns {
|
||||
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
|
||||
groupBy = append(groupBy, column.alias)
|
||||
}
|
||||
selects = append(selects,
|
||||
retentionExpr+" AS retention_days",
|
||||
retentionRuleIndexExpr+" AS retention_rule_index",
|
||||
"ifNull(sum(value), 0) AS value",
|
||||
)
|
||||
groupBy = append(groupBy, "retention_days", "retention_rule_index")
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(selects...)
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", meterName),
|
||||
sb.GTE("unix_milli", slice.StartMs),
|
||||
sb.LT("unix_milli", slice.EndMs),
|
||||
)
|
||||
sb.GroupBy(groupBy...)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return query, args, columns, nil
|
||||
}
|
||||
|
||||
type dimensionColumn struct {
|
||||
key string
|
||||
alias string
|
||||
}
|
||||
|
||||
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
|
||||
dimensionKeys, err := retention.RuleDimensionKeys(rules)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys := make([]string, 0, len(dimensionKeys)+1)
|
||||
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
|
||||
for _, key := range dimensionKeys {
|
||||
if key == metercollector.DimensionWorkspaceKeyID {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
columns := make([]dimensionColumn, len(keys))
|
||||
for i, key := range keys {
|
||||
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
|
||||
}
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func buildDimensions(
|
||||
orgID valuer.UUID,
|
||||
retentionDays int,
|
||||
retentionRuleIndex int,
|
||||
columns []dimensionColumn,
|
||||
values []string,
|
||||
rules []retentiontypes.CustomRetentionRule,
|
||||
) (map[string]string, error) {
|
||||
if len(columns) != len(values) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
|
||||
}
|
||||
|
||||
valuesByKey := make(map[string]string, len(columns))
|
||||
for i, column := range columns {
|
||||
valuesByKey[column.key] = values[i]
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
|
||||
}
|
||||
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
|
||||
|
||||
if retentionRuleIndex < 0 {
|
||||
return dimensions, nil
|
||||
}
|
||||
if retentionRuleIndex >= len(rules) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
|
||||
}
|
||||
for _, filter := range rules[retentionRuleIndex].Filters {
|
||||
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
|
||||
}
|
||||
return dimensions, nil
|
||||
}
|
||||
|
||||
func addNonEmpty(dimensions map[string]string, key, value string) {
|
||||
if value == "" {
|
||||
return
|
||||
}
|
||||
dimensions[key] = value
|
||||
}
|
||||
|
||||
func bucketKey(dimensions map[string]string) string {
|
||||
keys := make([]string, 0, len(dimensions))
|
||||
for key := range dimensions {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
var b strings.Builder
|
||||
for _, key := range keys {
|
||||
value := dimensions[key]
|
||||
b.WriteString(strconv.Itoa(len(key)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(key)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.Itoa(len(value)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(value)
|
||||
b.WriteByte(';')
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package datapointsizemetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildDimensions(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
rules := []retentiontypes.CustomRetentionRule{{
|
||||
Filters: []retentiontypes.FilterCondition{{
|
||||
Key: "service.name",
|
||||
Values: []string{"api"},
|
||||
}},
|
||||
TTLDays: 7,
|
||||
}}
|
||||
columns := []dimensionColumn{
|
||||
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
|
||||
{key: "service.name", alias: "dim_1"},
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
}, dimensions)
|
||||
}
|
||||
|
||||
func TestProviderMetadata(t *testing.T) {
|
||||
provider := New(nil, nil)
|
||||
|
||||
require.Equal(t, "signoz.meter.metric.datapoint.size", provider.Name().String())
|
||||
require.Equal(t, metercollectortypes.UnitBytes, provider.Unit())
|
||||
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
|
||||
}
|
||||
|
||||
func TestBucketKeyIsStable(t *testing.T) {
|
||||
first := bucketKey(map[string]string{
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
})
|
||||
second := bucketKey(map[string]string{
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
})
|
||||
|
||||
require.Equal(t, first, second)
|
||||
require.NotEmpty(t, first)
|
||||
}
|
||||
|
||||
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
|
||||
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
|
||||
require.Error(t, err)
|
||||
require.Nil(t, readings)
|
||||
}
|
||||
276
ee/metercollector/logcountmetercollector/provider.go
Normal file
276
ee/metercollector/logcountmetercollector/provider.go
Normal file
@@ -0,0 +1,276 @@
|
||||
// Package logcountmetercollector collects log count meters by workspace and
|
||||
// retention. Keep the query local to this meter.
|
||||
package logcountmetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
|
||||
"github.com/SigNoz/signoz/ee/metercollector/retention"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MeterName is the typed registry key for this collector.
|
||||
var (
|
||||
MeterName = metercollectortypes.MustNewName("signoz.meter.log.count")
|
||||
meterUnit = metercollectortypes.UnitCount
|
||||
meterAggregation = metercollectortypes.AggregationSum
|
||||
)
|
||||
|
||||
var _ metercollector.MeterCollector = (*Provider)(nil)
|
||||
|
||||
// Provider collects log count meters.
|
||||
type Provider struct {
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
sqlStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
|
||||
return &Provider{
|
||||
telemetryStore: telemetryStore,
|
||||
sqlStore: sqlStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
|
||||
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
|
||||
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
|
||||
return meterAggregation
|
||||
}
|
||||
|
||||
// Collect aggregates log count for the window and emits an empty-day sentinel.
|
||||
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
|
||||
if !window.IsValid() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
|
||||
}
|
||||
|
||||
meterName := MeterName.String()
|
||||
|
||||
slices, err := retention.LoadActiveSlices(
|
||||
ctx,
|
||||
p.sqlStore,
|
||||
orgID,
|
||||
telemetrylogs.DBName+"."+telemetrylogs.LogsV2LocalTableName,
|
||||
retentiontypes.DefaultLogsRetentionDays,
|
||||
window.StartUnixMilli, window.EndUnixMilli,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
dimensions map[string]string
|
||||
value float64
|
||||
}
|
||||
accumulator := make(map[string]*bucket)
|
||||
|
||||
for _, slice := range slices {
|
||||
query, args, dimensionColumns, err := buildQuery(meterName, slice)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
|
||||
}
|
||||
|
||||
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
dimensionValues := make([]string, len(dimensionColumns))
|
||||
var retentionDays int32
|
||||
var retentionRuleIndex int32
|
||||
var value float64
|
||||
|
||||
scanDest := make([]any, 0, len(dimensionValues)+3)
|
||||
for i := range dimensionValues {
|
||||
scanDest = append(scanDest, &dimensionValues[i])
|
||||
}
|
||||
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
|
||||
|
||||
if err := rows.Scan(scanDest...); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
key := bucketKey(dimensions)
|
||||
b, ok := accumulator[key]
|
||||
if !ok {
|
||||
b = &bucket{dimensions: dimensions}
|
||||
accumulator[key] = b
|
||||
}
|
||||
b.value += value
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
|
||||
for _, b := range accumulator {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
|
||||
}
|
||||
|
||||
// Empty windows still emit a sentinel so checkpoints can advance.
|
||||
if len(meters) == 0 && len(slices) > 0 {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
|
||||
}))
|
||||
}
|
||||
|
||||
return meters, nil
|
||||
}
|
||||
|
||||
// buildQuery stays local because each meter owns its billing query.
|
||||
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
|
||||
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
columns, err := dimensionColumnsFor(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
selects := make([]string, 0, len(columns)+3)
|
||||
groupBy := make([]string, 0, len(columns)+2)
|
||||
for _, column := range columns {
|
||||
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
|
||||
groupBy = append(groupBy, column.alias)
|
||||
}
|
||||
selects = append(selects,
|
||||
retentionExpr+" AS retention_days",
|
||||
retentionRuleIndexExpr+" AS retention_rule_index",
|
||||
"ifNull(sum(value), 0) AS value",
|
||||
)
|
||||
groupBy = append(groupBy, "retention_days", "retention_rule_index")
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(selects...)
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", meterName),
|
||||
sb.GTE("unix_milli", slice.StartMs),
|
||||
sb.LT("unix_milli", slice.EndMs),
|
||||
)
|
||||
sb.GroupBy(groupBy...)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return query, args, columns, nil
|
||||
}
|
||||
|
||||
type dimensionColumn struct {
|
||||
key string
|
||||
alias string
|
||||
}
|
||||
|
||||
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
|
||||
dimensionKeys, err := retention.RuleDimensionKeys(rules)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys := make([]string, 0, len(dimensionKeys)+1)
|
||||
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
|
||||
for _, key := range dimensionKeys {
|
||||
if key == metercollector.DimensionWorkspaceKeyID {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
columns := make([]dimensionColumn, len(keys))
|
||||
for i, key := range keys {
|
||||
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
|
||||
}
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func buildDimensions(
|
||||
orgID valuer.UUID,
|
||||
retentionDays int,
|
||||
retentionRuleIndex int,
|
||||
columns []dimensionColumn,
|
||||
values []string,
|
||||
rules []retentiontypes.CustomRetentionRule,
|
||||
) (map[string]string, error) {
|
||||
if len(columns) != len(values) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
|
||||
}
|
||||
|
||||
valuesByKey := make(map[string]string, len(columns))
|
||||
for i, column := range columns {
|
||||
valuesByKey[column.key] = values[i]
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
|
||||
}
|
||||
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
|
||||
|
||||
if retentionRuleIndex < 0 {
|
||||
return dimensions, nil
|
||||
}
|
||||
if retentionRuleIndex >= len(rules) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
|
||||
}
|
||||
for _, filter := range rules[retentionRuleIndex].Filters {
|
||||
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
|
||||
}
|
||||
return dimensions, nil
|
||||
}
|
||||
|
||||
func addNonEmpty(dimensions map[string]string, key, value string) {
|
||||
if value == "" {
|
||||
return
|
||||
}
|
||||
dimensions[key] = value
|
||||
}
|
||||
|
||||
func bucketKey(dimensions map[string]string) string {
|
||||
keys := make([]string, 0, len(dimensions))
|
||||
for key := range dimensions {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
var b strings.Builder
|
||||
for _, key := range keys {
|
||||
value := dimensions[key]
|
||||
b.WriteString(strconv.Itoa(len(key)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(key)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.Itoa(len(value)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(value)
|
||||
b.WriteByte(';')
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
67
ee/metercollector/logcountmetercollector/provider_test.go
Normal file
67
ee/metercollector/logcountmetercollector/provider_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package logcountmetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildDimensions(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
rules := []retentiontypes.CustomRetentionRule{{
|
||||
Filters: []retentiontypes.FilterCondition{{
|
||||
Key: "service.name",
|
||||
Values: []string{"api"},
|
||||
}},
|
||||
TTLDays: 7,
|
||||
}}
|
||||
columns := []dimensionColumn{
|
||||
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
|
||||
{key: "service.name", alias: "dim_1"},
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
}, dimensions)
|
||||
}
|
||||
|
||||
func TestProviderMetadata(t *testing.T) {
|
||||
provider := New(nil, nil)
|
||||
|
||||
require.Equal(t, "signoz.meter.log.count", provider.Name().String())
|
||||
require.Equal(t, metercollectortypes.UnitCount, provider.Unit())
|
||||
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
|
||||
}
|
||||
|
||||
func TestBucketKeyIsStable(t *testing.T) {
|
||||
first := bucketKey(map[string]string{
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
})
|
||||
second := bucketKey(map[string]string{
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
})
|
||||
|
||||
require.Equal(t, first, second)
|
||||
require.NotEmpty(t, first)
|
||||
}
|
||||
|
||||
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
|
||||
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
|
||||
require.Error(t, err)
|
||||
require.Nil(t, readings)
|
||||
}
|
||||
276
ee/metercollector/logsizemetercollector/provider.go
Normal file
276
ee/metercollector/logsizemetercollector/provider.go
Normal file
@@ -0,0 +1,276 @@
|
||||
// Package logsizemetercollector collects log size meters by workspace and
|
||||
// retention. Keep the query local to this meter.
|
||||
package logsizemetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
|
||||
"github.com/SigNoz/signoz/ee/metercollector/retention"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MeterName is the typed registry key for this collector.
|
||||
var (
|
||||
MeterName = metercollectortypes.MustNewName("signoz.meter.log.size")
|
||||
meterUnit = metercollectortypes.UnitBytes
|
||||
meterAggregation = metercollectortypes.AggregationSum
|
||||
)
|
||||
|
||||
var _ metercollector.MeterCollector = (*Provider)(nil)
|
||||
|
||||
// Provider collects log size meters.
|
||||
type Provider struct {
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
sqlStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
|
||||
return &Provider{
|
||||
telemetryStore: telemetryStore,
|
||||
sqlStore: sqlStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
|
||||
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
|
||||
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
|
||||
return meterAggregation
|
||||
}
|
||||
|
||||
// Collect aggregates log size for the window and emits an empty-day sentinel.
|
||||
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
|
||||
if !window.IsValid() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
|
||||
}
|
||||
|
||||
meterName := MeterName.String()
|
||||
|
||||
slices, err := retention.LoadActiveSlices(
|
||||
ctx,
|
||||
p.sqlStore,
|
||||
orgID,
|
||||
telemetrylogs.DBName+"."+telemetrylogs.LogsV2LocalTableName,
|
||||
retentiontypes.DefaultLogsRetentionDays,
|
||||
window.StartUnixMilli, window.EndUnixMilli,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
dimensions map[string]string
|
||||
value float64
|
||||
}
|
||||
accumulator := make(map[string]*bucket)
|
||||
|
||||
for _, slice := range slices {
|
||||
query, args, dimensionColumns, err := buildQuery(meterName, slice)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
|
||||
}
|
||||
|
||||
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
dimensionValues := make([]string, len(dimensionColumns))
|
||||
var retentionDays int32
|
||||
var retentionRuleIndex int32
|
||||
var value float64
|
||||
|
||||
scanDest := make([]any, 0, len(dimensionValues)+3)
|
||||
for i := range dimensionValues {
|
||||
scanDest = append(scanDest, &dimensionValues[i])
|
||||
}
|
||||
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
|
||||
|
||||
if err := rows.Scan(scanDest...); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
key := bucketKey(dimensions)
|
||||
b, ok := accumulator[key]
|
||||
if !ok {
|
||||
b = &bucket{dimensions: dimensions}
|
||||
accumulator[key] = b
|
||||
}
|
||||
b.value += value
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
|
||||
for _, b := range accumulator {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
|
||||
}
|
||||
|
||||
// Empty windows still emit a sentinel so checkpoints can advance.
|
||||
if len(meters) == 0 && len(slices) > 0 {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
|
||||
}))
|
||||
}
|
||||
|
||||
return meters, nil
|
||||
}
|
||||
|
||||
// buildQuery stays local because each meter owns its billing query.
|
||||
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
|
||||
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
columns, err := dimensionColumnsFor(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
selects := make([]string, 0, len(columns)+3)
|
||||
groupBy := make([]string, 0, len(columns)+2)
|
||||
for _, column := range columns {
|
||||
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
|
||||
groupBy = append(groupBy, column.alias)
|
||||
}
|
||||
selects = append(selects,
|
||||
retentionExpr+" AS retention_days",
|
||||
retentionRuleIndexExpr+" AS retention_rule_index",
|
||||
"ifNull(sum(value), 0) AS value",
|
||||
)
|
||||
groupBy = append(groupBy, "retention_days", "retention_rule_index")
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(selects...)
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", meterName),
|
||||
sb.GTE("unix_milli", slice.StartMs),
|
||||
sb.LT("unix_milli", slice.EndMs),
|
||||
)
|
||||
sb.GroupBy(groupBy...)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return query, args, columns, nil
|
||||
}
|
||||
|
||||
type dimensionColumn struct {
|
||||
key string
|
||||
alias string
|
||||
}
|
||||
|
||||
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
|
||||
dimensionKeys, err := retention.RuleDimensionKeys(rules)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys := make([]string, 0, len(dimensionKeys)+1)
|
||||
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
|
||||
for _, key := range dimensionKeys {
|
||||
if key == metercollector.DimensionWorkspaceKeyID {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
columns := make([]dimensionColumn, len(keys))
|
||||
for i, key := range keys {
|
||||
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
|
||||
}
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func buildDimensions(
|
||||
orgID valuer.UUID,
|
||||
retentionDays int,
|
||||
retentionRuleIndex int,
|
||||
columns []dimensionColumn,
|
||||
values []string,
|
||||
rules []retentiontypes.CustomRetentionRule,
|
||||
) (map[string]string, error) {
|
||||
if len(columns) != len(values) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
|
||||
}
|
||||
|
||||
valuesByKey := make(map[string]string, len(columns))
|
||||
for i, column := range columns {
|
||||
valuesByKey[column.key] = values[i]
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
|
||||
}
|
||||
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
|
||||
|
||||
if retentionRuleIndex < 0 {
|
||||
return dimensions, nil
|
||||
}
|
||||
if retentionRuleIndex >= len(rules) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
|
||||
}
|
||||
for _, filter := range rules[retentionRuleIndex].Filters {
|
||||
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
|
||||
}
|
||||
return dimensions, nil
|
||||
}
|
||||
|
||||
func addNonEmpty(dimensions map[string]string, key, value string) {
|
||||
if value == "" {
|
||||
return
|
||||
}
|
||||
dimensions[key] = value
|
||||
}
|
||||
|
||||
func bucketKey(dimensions map[string]string) string {
|
||||
keys := make([]string, 0, len(dimensions))
|
||||
for key := range dimensions {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
var b strings.Builder
|
||||
for _, key := range keys {
|
||||
value := dimensions[key]
|
||||
b.WriteString(strconv.Itoa(len(key)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(key)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.Itoa(len(value)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(value)
|
||||
b.WriteByte(';')
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
67
ee/metercollector/logsizemetercollector/provider_test.go
Normal file
67
ee/metercollector/logsizemetercollector/provider_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package logsizemetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildDimensions(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
rules := []retentiontypes.CustomRetentionRule{{
|
||||
Filters: []retentiontypes.FilterCondition{{
|
||||
Key: "service.name",
|
||||
Values: []string{"api"},
|
||||
}},
|
||||
TTLDays: 7,
|
||||
}}
|
||||
columns := []dimensionColumn{
|
||||
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
|
||||
{key: "service.name", alias: "dim_1"},
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
}, dimensions)
|
||||
}
|
||||
|
||||
func TestProviderMetadata(t *testing.T) {
|
||||
provider := New(nil, nil)
|
||||
|
||||
require.Equal(t, "signoz.meter.log.size", provider.Name().String())
|
||||
require.Equal(t, metercollectortypes.UnitBytes, provider.Unit())
|
||||
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
|
||||
}
|
||||
|
||||
func TestBucketKeyIsStable(t *testing.T) {
|
||||
first := bucketKey(map[string]string{
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
})
|
||||
second := bucketKey(map[string]string{
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
})
|
||||
|
||||
require.Equal(t, first, second)
|
||||
require.NotEmpty(t, first)
|
||||
}
|
||||
|
||||
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
|
||||
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
|
||||
require.Error(t, err)
|
||||
require.Nil(t, readings)
|
||||
}
|
||||
255
ee/metercollector/retention/retention.go
Normal file
255
ee/metercollector/retention/retention.go
Normal file
@@ -0,0 +1,255 @@
|
||||
// Package retention builds retention slices and SQL expressions for meters.
|
||||
// Collectors still own their table names, defaults, and aggregation queries.
|
||||
package retention
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
const secondsPerDay = 24 * 60 * 60
|
||||
|
||||
// These values are inlined into SQL, so keep the allowlist strict.
|
||||
var (
|
||||
labelKeyPattern = regexp.MustCompile(`^[A-Za-z0-9_.\-]+$`)
|
||||
labelValuePattern = regexp.MustCompile(`^[A-Za-z0-9_.\-:]+$`)
|
||||
)
|
||||
|
||||
// LoadActiveSlices returns TTL slices covering [startMs, endMs).
|
||||
// tableName must be fully qualified, for example "signoz_logs.logs_v2".
|
||||
func LoadActiveSlices(
|
||||
ctx context.Context,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgID valuer.UUID,
|
||||
tableName string,
|
||||
fallbackDefaultDays int,
|
||||
startMs, endMs int64,
|
||||
) ([]retentiontypes.Slice, error) {
|
||||
if startMs >= endMs {
|
||||
return nil, nil
|
||||
}
|
||||
if sqlstore == nil {
|
||||
return nil, errors.New(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "sqlstore is nil")
|
||||
}
|
||||
if tableName == "" {
|
||||
return nil, errors.New(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "tableName is empty")
|
||||
}
|
||||
if fallbackDefaultDays <= 0 {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "non-positive fallbackDefaultDays %d", fallbackDefaultDays)
|
||||
}
|
||||
|
||||
rows := []*retentiontypes.TTLSetting{}
|
||||
err := sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(&rows).
|
||||
Where("table_name = ?", tableName).
|
||||
Where("org_id = ?", orgID.StringValue()).
|
||||
Where("status = ?", retentiontypes.TTLSettingStatusSuccess).
|
||||
Where("created_at < ?", time.UnixMilli(endMs).UTC()).
|
||||
OrderExpr("created_at ASC").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load ttl_setting rows for org %q table %q", orgID.StringValue(), tableName)
|
||||
}
|
||||
|
||||
return buildSlicesFromRows(rows, fallbackDefaultDays, startMs, endMs)
|
||||
}
|
||||
|
||||
func buildSlicesFromRows(rows []*retentiontypes.TTLSetting, fallbackDefaultDays int, startMs, endMs int64) ([]retentiontypes.Slice, error) {
|
||||
if startMs >= endMs {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// The latest row before the window is active at the window start.
|
||||
var activeAtStart *retentiontypes.TTLSetting
|
||||
inWindow := make([]*retentiontypes.TTLSetting, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
rowMs := row.CreatedAt.UnixMilli()
|
||||
if rowMs <= startMs {
|
||||
activeAtStart = row
|
||||
continue
|
||||
}
|
||||
if rowMs >= endMs {
|
||||
continue
|
||||
}
|
||||
inWindow = append(inWindow, row)
|
||||
}
|
||||
|
||||
activeRules, activeDefault, err := parseTTLSetting(activeAtStart, fallbackDefaultDays)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
slices := make([]retentiontypes.Slice, 0, len(inWindow)+1)
|
||||
cursor := startMs
|
||||
for _, row := range inWindow {
|
||||
rowMs := row.CreatedAt.UnixMilli()
|
||||
if rowMs <= cursor {
|
||||
// Same-ms updates collapse: replace active config, no empty slice.
|
||||
activeRules, activeDefault, err = parseTTLSetting(row, fallbackDefaultDays)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
slices = append(slices, retentiontypes.Slice{
|
||||
StartMs: cursor,
|
||||
EndMs: rowMs,
|
||||
Rules: activeRules,
|
||||
DefaultDays: activeDefault,
|
||||
})
|
||||
cursor = rowMs
|
||||
activeRules, activeDefault, err = parseTTLSetting(row, fallbackDefaultDays)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if cursor < endMs {
|
||||
slices = append(slices, retentiontypes.Slice{
|
||||
StartMs: cursor,
|
||||
EndMs: endMs,
|
||||
Rules: activeRules,
|
||||
DefaultDays: activeDefault,
|
||||
})
|
||||
}
|
||||
|
||||
return slices, nil
|
||||
}
|
||||
|
||||
// parseTTLSetting returns rules and default days for one ttl_setting row.
|
||||
func parseTTLSetting(row *retentiontypes.TTLSetting, fallbackDefaultDays int) ([]retentiontypes.CustomRetentionRule, int, error) {
|
||||
if row == nil {
|
||||
return nil, fallbackDefaultDays, nil
|
||||
}
|
||||
|
||||
defaultDays := row.TTL
|
||||
if row.Condition == "" {
|
||||
// V1 stores seconds; round up to days.
|
||||
defaultDays = (row.TTL + secondsPerDay - 1) / secondsPerDay
|
||||
}
|
||||
if defaultDays <= 0 {
|
||||
defaultDays = fallbackDefaultDays
|
||||
}
|
||||
|
||||
if row.Condition == "" {
|
||||
return nil, defaultDays, nil
|
||||
}
|
||||
|
||||
var rules []retentiontypes.CustomRetentionRule
|
||||
if err := json.Unmarshal([]byte(row.Condition), &rules); err != nil {
|
||||
return nil, 0, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "parse ttl_setting condition for row %q", row.ID.StringValue())
|
||||
}
|
||||
|
||||
return rules, defaultDays, nil
|
||||
}
|
||||
|
||||
// BuildMultiIfSQL renders the retention-days expression for one slice.
|
||||
func BuildMultiIfSQL(rules []retentiontypes.CustomRetentionRule, defaultDays int) (string, error) {
|
||||
if defaultDays <= 0 {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "non-positive default retention %d", defaultDays)
|
||||
}
|
||||
|
||||
if len(rules) == 0 {
|
||||
return "toInt32(" + strconv.Itoa(defaultDays) + ")", nil
|
||||
}
|
||||
|
||||
arms := make([]string, 0, 2*len(rules)+1)
|
||||
for ruleIndex, rule := range rules {
|
||||
if rule.TTLDays <= 0 {
|
||||
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d has non-positive ttl_days %d", ruleIndex, rule.TTLDays)
|
||||
}
|
||||
conditionExpr, err := buildRuleConditionSQL(ruleIndex, rule)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
arms = append(arms, conditionExpr)
|
||||
arms = append(arms, strconv.Itoa(rule.TTLDays))
|
||||
}
|
||||
arms = append(arms, strconv.Itoa(defaultDays))
|
||||
|
||||
return "toInt32(multiIf(" + strings.Join(arms, ", ") + "))", nil
|
||||
}
|
||||
|
||||
// BuildRuleIndexSQL renders the matched rule index, or -1 for fallback.
|
||||
func BuildRuleIndexSQL(rules []retentiontypes.CustomRetentionRule) (string, error) {
|
||||
if len(rules) == 0 {
|
||||
return "toInt32(-1)", nil
|
||||
}
|
||||
|
||||
arms := make([]string, 0, 2*len(rules)+1)
|
||||
for ruleIndex, rule := range rules {
|
||||
conditionExpr, err := buildRuleConditionSQL(ruleIndex, rule)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
arms = append(arms, conditionExpr)
|
||||
arms = append(arms, strconv.Itoa(ruleIndex))
|
||||
}
|
||||
arms = append(arms, "-1")
|
||||
|
||||
return "toInt32(multiIf(" + strings.Join(arms, ", ") + "))", nil
|
||||
}
|
||||
|
||||
func buildRuleConditionSQL(ruleIndex int, rule retentiontypes.CustomRetentionRule) (string, error) {
|
||||
if len(rule.Filters) == 0 {
|
||||
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d has no filters", ruleIndex)
|
||||
}
|
||||
|
||||
filterExprs := make([]string, 0, len(rule.Filters))
|
||||
for filterIndex, filter := range rule.Filters {
|
||||
if !labelKeyPattern.MatchString(filter.Key) {
|
||||
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d filter %d has invalid key %q", ruleIndex, filterIndex, filter.Key)
|
||||
}
|
||||
if len(filter.Values) == 0 {
|
||||
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d filter %d has no values", ruleIndex, filterIndex)
|
||||
}
|
||||
|
||||
quoted := make([]string, len(filter.Values))
|
||||
for valueIndex, value := range filter.Values {
|
||||
if !labelValuePattern.MatchString(value) {
|
||||
return "", errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d filter %d value %d is invalid %q", ruleIndex, filterIndex, valueIndex, value)
|
||||
}
|
||||
quoted[valueIndex] = "'" + value + "'"
|
||||
}
|
||||
|
||||
filterExprs = append(filterExprs, fmt.Sprintf("JSONExtractString(labels, '%s') IN (%s)", filter.Key, strings.Join(quoted, ", ")))
|
||||
}
|
||||
|
||||
return strings.Join(filterExprs, " AND "), nil
|
||||
}
|
||||
|
||||
// RuleDimensionKeys returns unique label keys referenced by retention rules.
|
||||
func RuleDimensionKeys(rules []retentiontypes.CustomRetentionRule) ([]string, error) {
|
||||
keys := make([]string, 0)
|
||||
seen := make(map[string]struct{})
|
||||
|
||||
for ruleIndex, rule := range rules {
|
||||
for filterIndex, filter := range rule.Filters {
|
||||
if !labelKeyPattern.MatchString(filter.Key) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "rule %d filter %d has invalid key %q", ruleIndex, filterIndex, filter.Key)
|
||||
}
|
||||
if _, ok := seen[filter.Key]; ok {
|
||||
continue
|
||||
}
|
||||
seen[filter.Key] = struct{}{}
|
||||
keys = append(keys, filter.Key)
|
||||
}
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
153
ee/metercollector/retention/retention_test.go
Normal file
153
ee/metercollector/retention/retention_test.go
Normal file
@@ -0,0 +1,153 @@
|
||||
package retention
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildSlicesFromRows(t *testing.T) {
|
||||
start := time.Date(2026, 5, 4, 0, 0, 0, 0, time.UTC)
|
||||
end := start.AddDate(0, 0, 1)
|
||||
|
||||
ruleA := retentiontypes.CustomRetentionRule{
|
||||
Filters: []retentiontypes.FilterCondition{{Key: "service.name", Values: []string{"api"}}},
|
||||
TTLDays: 7,
|
||||
}
|
||||
ruleB := retentiontypes.CustomRetentionRule{
|
||||
Filters: []retentiontypes.FilterCondition{{Key: "env", Values: []string{"prod"}}},
|
||||
TTLDays: 15,
|
||||
}
|
||||
|
||||
t.Run("row before window is active at start", func(t *testing.T) {
|
||||
slices, err := buildSlicesFromRows(
|
||||
[]*retentiontypes.TTLSetting{
|
||||
ttlSetting(t, start.Add(-time.Hour), 45, []retentiontypes.CustomRetentionRule{ruleA}),
|
||||
},
|
||||
30,
|
||||
start.UnixMilli(),
|
||||
end.UnixMilli(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []retentiontypes.Slice{{
|
||||
StartMs: start.UnixMilli(),
|
||||
EndMs: end.UnixMilli(),
|
||||
Rules: []retentiontypes.CustomRetentionRule{ruleA},
|
||||
DefaultDays: 45,
|
||||
}}, slices)
|
||||
})
|
||||
|
||||
t.Run("row inside window splits slices", func(t *testing.T) {
|
||||
firstChange := start.Add(6 * time.Hour)
|
||||
secondChange := start.Add(18 * time.Hour)
|
||||
|
||||
slices, err := buildSlicesFromRows(
|
||||
[]*retentiontypes.TTLSetting{
|
||||
ttlSetting(t, firstChange, 21, []retentiontypes.CustomRetentionRule{ruleA}),
|
||||
ttlSetting(t, secondChange, 14, []retentiontypes.CustomRetentionRule{ruleB}),
|
||||
},
|
||||
30,
|
||||
start.UnixMilli(),
|
||||
end.UnixMilli(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []retentiontypes.Slice{
|
||||
{
|
||||
StartMs: start.UnixMilli(),
|
||||
EndMs: firstChange.UnixMilli(),
|
||||
DefaultDays: 30,
|
||||
},
|
||||
{
|
||||
StartMs: firstChange.UnixMilli(),
|
||||
EndMs: secondChange.UnixMilli(),
|
||||
Rules: []retentiontypes.CustomRetentionRule{ruleA},
|
||||
DefaultDays: 21,
|
||||
},
|
||||
{
|
||||
StartMs: secondChange.UnixMilli(),
|
||||
EndMs: end.UnixMilli(),
|
||||
Rules: []retentiontypes.CustomRetentionRule{ruleB},
|
||||
DefaultDays: 14,
|
||||
},
|
||||
}, slices)
|
||||
})
|
||||
|
||||
t.Run("no rows uses fallback", func(t *testing.T) {
|
||||
slices, err := buildSlicesFromRows(nil, 30, start.UnixMilli(), end.UnixMilli())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []retentiontypes.Slice{{
|
||||
StartMs: start.UnixMilli(),
|
||||
EndMs: end.UnixMilli(),
|
||||
DefaultDays: 30,
|
||||
}}, slices)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRetentionSQL(t *testing.T) {
|
||||
rules := []retentiontypes.CustomRetentionRule{{
|
||||
Filters: []retentiontypes.FilterCondition{{
|
||||
Key: "service.name",
|
||||
Values: []string{"api", "worker"},
|
||||
}},
|
||||
TTLDays: 7,
|
||||
}}
|
||||
|
||||
retentionSQL, err := BuildMultiIfSQL(rules, 30)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "toInt32(multiIf(JSONExtractString(labels, 'service.name') IN ('api', 'worker'), 7, 30))", retentionSQL)
|
||||
|
||||
ruleIndexSQL, err := BuildRuleIndexSQL(rules)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "toInt32(multiIf(JSONExtractString(labels, 'service.name') IN ('api', 'worker'), 0, -1))", ruleIndexSQL)
|
||||
|
||||
invalidRules := []retentiontypes.CustomRetentionRule{{
|
||||
Filters: []retentiontypes.FilterCondition{{
|
||||
Key: "service name",
|
||||
Values: []string{"api"},
|
||||
}},
|
||||
TTLDays: 7,
|
||||
}}
|
||||
|
||||
_, err = BuildMultiIfSQL(invalidRules, 30)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = BuildRuleIndexSQL(invalidRules)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestRuleDimensionKeysDedupes(t *testing.T) {
|
||||
keys, err := RuleDimensionKeys([]retentiontypes.CustomRetentionRule{
|
||||
{
|
||||
Filters: []retentiontypes.FilterCondition{
|
||||
{Key: "service.name", Values: []string{"api"}},
|
||||
{Key: "env", Values: []string{"prod"}},
|
||||
},
|
||||
TTLDays: 7,
|
||||
},
|
||||
{
|
||||
Filters: []retentiontypes.FilterCondition{
|
||||
{Key: "service.name", Values: []string{"worker"}},
|
||||
{Key: "cluster", Values: []string{"primary"}},
|
||||
},
|
||||
TTLDays: 15,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{"service.name", "env", "cluster"}, keys)
|
||||
}
|
||||
|
||||
func ttlSetting(t *testing.T, createdAt time.Time, ttlDays int, rules []retentiontypes.CustomRetentionRule) *retentiontypes.TTLSetting {
|
||||
t.Helper()
|
||||
|
||||
condition, err := json.Marshal(rules)
|
||||
require.NoError(t, err)
|
||||
|
||||
return &retentiontypes.TTLSetting{
|
||||
CreatedAt: createdAt,
|
||||
TTL: ttlDays,
|
||||
Condition: string(condition),
|
||||
}
|
||||
}
|
||||
276
ee/metercollector/spancountmetercollector/provider.go
Normal file
276
ee/metercollector/spancountmetercollector/provider.go
Normal file
@@ -0,0 +1,276 @@
|
||||
// Package spancountmetercollector collects span count meters by workspace and
|
||||
// retention. Keep the query local to this meter.
|
||||
package spancountmetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
|
||||
"github.com/SigNoz/signoz/ee/metercollector/retention"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MeterName is the typed registry key for this collector.
|
||||
var (
|
||||
MeterName = metercollectortypes.MustNewName("signoz.meter.span.count")
|
||||
meterUnit = metercollectortypes.UnitCount
|
||||
meterAggregation = metercollectortypes.AggregationSum
|
||||
)
|
||||
|
||||
var _ metercollector.MeterCollector = (*Provider)(nil)
|
||||
|
||||
// Provider collects span count meters.
|
||||
type Provider struct {
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
sqlStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
|
||||
return &Provider{
|
||||
telemetryStore: telemetryStore,
|
||||
sqlStore: sqlStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
|
||||
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
|
||||
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
|
||||
return meterAggregation
|
||||
}
|
||||
|
||||
// Collect aggregates span count for the window and emits an empty-day sentinel.
|
||||
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
|
||||
if !window.IsValid() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
|
||||
}
|
||||
|
||||
meterName := MeterName.String()
|
||||
|
||||
slices, err := retention.LoadActiveSlices(
|
||||
ctx,
|
||||
p.sqlStore,
|
||||
orgID,
|
||||
telemetrytraces.DBName+"."+telemetrytraces.SpanIndexV3LocalTableName,
|
||||
retentiontypes.DefaultTracesRetentionDays,
|
||||
window.StartUnixMilli, window.EndUnixMilli,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
dimensions map[string]string
|
||||
value float64
|
||||
}
|
||||
accumulator := make(map[string]*bucket)
|
||||
|
||||
for _, slice := range slices {
|
||||
query, args, dimensionColumns, err := buildQuery(meterName, slice)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
|
||||
}
|
||||
|
||||
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
dimensionValues := make([]string, len(dimensionColumns))
|
||||
var retentionDays int32
|
||||
var retentionRuleIndex int32
|
||||
var value float64
|
||||
|
||||
scanDest := make([]any, 0, len(dimensionValues)+3)
|
||||
for i := range dimensionValues {
|
||||
scanDest = append(scanDest, &dimensionValues[i])
|
||||
}
|
||||
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
|
||||
|
||||
if err := rows.Scan(scanDest...); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
key := bucketKey(dimensions)
|
||||
b, ok := accumulator[key]
|
||||
if !ok {
|
||||
b = &bucket{dimensions: dimensions}
|
||||
accumulator[key] = b
|
||||
}
|
||||
b.value += value
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
|
||||
for _, b := range accumulator {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
|
||||
}
|
||||
|
||||
// Empty windows still emit a sentinel so checkpoints can advance.
|
||||
if len(meters) == 0 && len(slices) > 0 {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
|
||||
}))
|
||||
}
|
||||
|
||||
return meters, nil
|
||||
}
|
||||
|
||||
// buildQuery stays local because each meter owns its billing query.
|
||||
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
|
||||
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
columns, err := dimensionColumnsFor(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
selects := make([]string, 0, len(columns)+3)
|
||||
groupBy := make([]string, 0, len(columns)+2)
|
||||
for _, column := range columns {
|
||||
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
|
||||
groupBy = append(groupBy, column.alias)
|
||||
}
|
||||
selects = append(selects,
|
||||
retentionExpr+" AS retention_days",
|
||||
retentionRuleIndexExpr+" AS retention_rule_index",
|
||||
"ifNull(sum(value), 0) AS value",
|
||||
)
|
||||
groupBy = append(groupBy, "retention_days", "retention_rule_index")
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(selects...)
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", meterName),
|
||||
sb.GTE("unix_milli", slice.StartMs),
|
||||
sb.LT("unix_milli", slice.EndMs),
|
||||
)
|
||||
sb.GroupBy(groupBy...)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return query, args, columns, nil
|
||||
}
|
||||
|
||||
type dimensionColumn struct {
|
||||
key string
|
||||
alias string
|
||||
}
|
||||
|
||||
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
|
||||
dimensionKeys, err := retention.RuleDimensionKeys(rules)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys := make([]string, 0, len(dimensionKeys)+1)
|
||||
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
|
||||
for _, key := range dimensionKeys {
|
||||
if key == metercollector.DimensionWorkspaceKeyID {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
columns := make([]dimensionColumn, len(keys))
|
||||
for i, key := range keys {
|
||||
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
|
||||
}
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func buildDimensions(
|
||||
orgID valuer.UUID,
|
||||
retentionDays int,
|
||||
retentionRuleIndex int,
|
||||
columns []dimensionColumn,
|
||||
values []string,
|
||||
rules []retentiontypes.CustomRetentionRule,
|
||||
) (map[string]string, error) {
|
||||
if len(columns) != len(values) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
|
||||
}
|
||||
|
||||
valuesByKey := make(map[string]string, len(columns))
|
||||
for i, column := range columns {
|
||||
valuesByKey[column.key] = values[i]
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
|
||||
}
|
||||
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
|
||||
|
||||
if retentionRuleIndex < 0 {
|
||||
return dimensions, nil
|
||||
}
|
||||
if retentionRuleIndex >= len(rules) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
|
||||
}
|
||||
for _, filter := range rules[retentionRuleIndex].Filters {
|
||||
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
|
||||
}
|
||||
return dimensions, nil
|
||||
}
|
||||
|
||||
func addNonEmpty(dimensions map[string]string, key, value string) {
|
||||
if value == "" {
|
||||
return
|
||||
}
|
||||
dimensions[key] = value
|
||||
}
|
||||
|
||||
func bucketKey(dimensions map[string]string) string {
|
||||
keys := make([]string, 0, len(dimensions))
|
||||
for key := range dimensions {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
var b strings.Builder
|
||||
for _, key := range keys {
|
||||
value := dimensions[key]
|
||||
b.WriteString(strconv.Itoa(len(key)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(key)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.Itoa(len(value)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(value)
|
||||
b.WriteByte(';')
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
67
ee/metercollector/spancountmetercollector/provider_test.go
Normal file
67
ee/metercollector/spancountmetercollector/provider_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package spancountmetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildDimensions(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
rules := []retentiontypes.CustomRetentionRule{{
|
||||
Filters: []retentiontypes.FilterCondition{{
|
||||
Key: "service.name",
|
||||
Values: []string{"api"},
|
||||
}},
|
||||
TTLDays: 7,
|
||||
}}
|
||||
columns := []dimensionColumn{
|
||||
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
|
||||
{key: "service.name", alias: "dim_1"},
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
}, dimensions)
|
||||
}
|
||||
|
||||
func TestProviderMetadata(t *testing.T) {
|
||||
provider := New(nil, nil)
|
||||
|
||||
require.Equal(t, "signoz.meter.span.count", provider.Name().String())
|
||||
require.Equal(t, metercollectortypes.UnitCount, provider.Unit())
|
||||
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
|
||||
}
|
||||
|
||||
func TestBucketKeyIsStable(t *testing.T) {
|
||||
first := bucketKey(map[string]string{
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
})
|
||||
second := bucketKey(map[string]string{
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
})
|
||||
|
||||
require.Equal(t, first, second)
|
||||
require.NotEmpty(t, first)
|
||||
}
|
||||
|
||||
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
|
||||
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
|
||||
require.Error(t, err)
|
||||
require.Nil(t, readings)
|
||||
}
|
||||
276
ee/metercollector/spansizemetercollector/provider.go
Normal file
276
ee/metercollector/spansizemetercollector/provider.go
Normal file
@@ -0,0 +1,276 @@
|
||||
// Package spansizemetercollector collects span size meters by workspace and
|
||||
// retention. Keep the query local to this meter.
|
||||
package spansizemetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
|
||||
"github.com/SigNoz/signoz/ee/metercollector/retention"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MeterName is the typed registry key for this collector.
|
||||
var (
|
||||
MeterName = metercollectortypes.MustNewName("signoz.meter.span.size")
|
||||
meterUnit = metercollectortypes.UnitBytes
|
||||
meterAggregation = metercollectortypes.AggregationSum
|
||||
)
|
||||
|
||||
var _ metercollector.MeterCollector = (*Provider)(nil)
|
||||
|
||||
// Provider collects span size meters.
|
||||
type Provider struct {
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
sqlStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func New(telemetryStore telemetrystore.TelemetryStore, sqlStore sqlstore.SQLStore) *Provider {
|
||||
return &Provider{
|
||||
telemetryStore: telemetryStore,
|
||||
sqlStore: sqlStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) Name() metercollectortypes.Name { return MeterName }
|
||||
func (p *Provider) Unit() metercollectortypes.Unit { return meterUnit }
|
||||
func (p *Provider) Aggregation() metercollectortypes.Aggregation {
|
||||
return meterAggregation
|
||||
}
|
||||
|
||||
// Collect aggregates span size for the window and emits an empty-day sentinel.
|
||||
func (p *Provider) Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
|
||||
if !window.IsValid() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, metercollector.ErrCodeCollectFailed, "invalid window [%d, %d)", window.StartUnixMilli, window.EndUnixMilli)
|
||||
}
|
||||
|
||||
meterName := MeterName.String()
|
||||
|
||||
slices, err := retention.LoadActiveSlices(
|
||||
ctx,
|
||||
p.sqlStore,
|
||||
orgID,
|
||||
telemetrytraces.DBName+"."+telemetrytraces.SpanIndexV3LocalTableName,
|
||||
retentiontypes.DefaultTracesRetentionDays,
|
||||
window.StartUnixMilli, window.EndUnixMilli,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "load retention slices for meter %q", meterName)
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
dimensions map[string]string
|
||||
value float64
|
||||
}
|
||||
accumulator := make(map[string]*bucket)
|
||||
|
||||
for _, slice := range slices {
|
||||
query, args, dimensionColumns, err := buildQuery(meterName, slice)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build retention query for meter %q", meterName)
|
||||
}
|
||||
|
||||
rows, err := p.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "query meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
dimensionValues := make([]string, len(dimensionColumns))
|
||||
var retentionDays int32
|
||||
var retentionRuleIndex int32
|
||||
var value float64
|
||||
|
||||
scanDest := make([]any, 0, len(dimensionValues)+3)
|
||||
for i := range dimensionValues {
|
||||
scanDest = append(scanDest, &dimensionValues[i])
|
||||
}
|
||||
scanDest = append(scanDest, &retentionDays, &retentionRuleIndex, &value)
|
||||
|
||||
if err := rows.Scan(scanDest...); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "scan meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, int(retentionDays), int(retentionRuleIndex), dimensionColumns, dimensionValues, slice.Rules)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "build dimensions for meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
|
||||
key := bucketKey(dimensions)
|
||||
b, ok := accumulator[key]
|
||||
if !ok {
|
||||
b = &bucket{dimensions: dimensions}
|
||||
accumulator[key] = b
|
||||
}
|
||||
b.value += value
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, metercollector.ErrCodeCollectFailed, "iterate meter %q slice [%d, %d)", meterName, slice.StartMs, slice.EndMs)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
meters := make([]meterreportertypes.Meter, 0, len(accumulator))
|
||||
for _, b := range accumulator {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, b.value, meterUnit, meterAggregation, window, b.dimensions))
|
||||
}
|
||||
|
||||
// Empty windows still emit a sentinel so checkpoints can advance.
|
||||
if len(meters) == 0 && len(slices) > 0 {
|
||||
meters = append(meters, meterreportertypes.NewMeter(MeterName, 0, meterUnit, meterAggregation, window, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(slices[len(slices)-1].DefaultDays),
|
||||
}))
|
||||
}
|
||||
|
||||
return meters, nil
|
||||
}
|
||||
|
||||
// buildQuery stays local because each meter owns its billing query.
|
||||
func buildQuery(meterName string, slice retentiontypes.Slice) (string, []any, []dimensionColumn, error) {
|
||||
retentionExpr, err := retention.BuildMultiIfSQL(slice.Rules, slice.DefaultDays)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
retentionRuleIndexExpr, err := retention.BuildRuleIndexSQL(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
columns, err := dimensionColumnsFor(slice.Rules)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
selects := make([]string, 0, len(columns)+3)
|
||||
groupBy := make([]string, 0, len(columns)+2)
|
||||
for _, column := range columns {
|
||||
selects = append(selects, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", column.key, column.alias))
|
||||
groupBy = append(groupBy, column.alias)
|
||||
}
|
||||
selects = append(selects,
|
||||
retentionExpr+" AS retention_days",
|
||||
retentionRuleIndexExpr+" AS retention_rule_index",
|
||||
"ifNull(sum(value), 0) AS value",
|
||||
)
|
||||
groupBy = append(groupBy, "retention_days", "retention_rule_index")
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(selects...)
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
sb.Where(
|
||||
sb.Equal("metric_name", meterName),
|
||||
sb.GTE("unix_milli", slice.StartMs),
|
||||
sb.LT("unix_milli", slice.EndMs),
|
||||
)
|
||||
sb.GroupBy(groupBy...)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return query, args, columns, nil
|
||||
}
|
||||
|
||||
type dimensionColumn struct {
|
||||
key string
|
||||
alias string
|
||||
}
|
||||
|
||||
func dimensionColumnsFor(rules []retentiontypes.CustomRetentionRule) ([]dimensionColumn, error) {
|
||||
dimensionKeys, err := retention.RuleDimensionKeys(rules)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys := make([]string, 0, len(dimensionKeys)+1)
|
||||
keys = append(keys, metercollector.DimensionWorkspaceKeyID)
|
||||
for _, key := range dimensionKeys {
|
||||
if key == metercollector.DimensionWorkspaceKeyID {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
columns := make([]dimensionColumn, len(keys))
|
||||
for i, key := range keys {
|
||||
columns[i] = dimensionColumn{key: key, alias: fmt.Sprintf("dim_%d", i)}
|
||||
}
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func buildDimensions(
|
||||
orgID valuer.UUID,
|
||||
retentionDays int,
|
||||
retentionRuleIndex int,
|
||||
columns []dimensionColumn,
|
||||
values []string,
|
||||
rules []retentiontypes.CustomRetentionRule,
|
||||
) (map[string]string, error) {
|
||||
if len(columns) != len(values) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "dimension column/value count mismatch: %d columns, %d values", len(columns), len(values))
|
||||
}
|
||||
|
||||
valuesByKey := make(map[string]string, len(columns))
|
||||
for i, column := range columns {
|
||||
valuesByKey[column.key] = values[i]
|
||||
}
|
||||
|
||||
dimensions := map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: strconv.Itoa(retentionDays),
|
||||
}
|
||||
addNonEmpty(dimensions, metercollector.DimensionWorkspaceKeyID, valuesByKey[metercollector.DimensionWorkspaceKeyID])
|
||||
|
||||
if retentionRuleIndex < 0 {
|
||||
return dimensions, nil
|
||||
}
|
||||
if retentionRuleIndex >= len(rules) {
|
||||
return nil, errors.Newf(errors.TypeInternal, metercollector.ErrCodeCollectFailed, "retention rule index %d out of range for %d rules", retentionRuleIndex, len(rules))
|
||||
}
|
||||
for _, filter := range rules[retentionRuleIndex].Filters {
|
||||
addNonEmpty(dimensions, filter.Key, valuesByKey[filter.Key])
|
||||
}
|
||||
return dimensions, nil
|
||||
}
|
||||
|
||||
func addNonEmpty(dimensions map[string]string, key, value string) {
|
||||
if value == "" {
|
||||
return
|
||||
}
|
||||
dimensions[key] = value
|
||||
}
|
||||
|
||||
func bucketKey(dimensions map[string]string) string {
|
||||
keys := make([]string, 0, len(dimensions))
|
||||
for key := range dimensions {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
var b strings.Builder
|
||||
for _, key := range keys {
|
||||
value := dimensions[key]
|
||||
b.WriteString(strconv.Itoa(len(key)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(key)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.Itoa(len(value)))
|
||||
b.WriteByte(':')
|
||||
b.WriteString(value)
|
||||
b.WriteByte(';')
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
67
ee/metercollector/spansizemetercollector/provider_test.go
Normal file
67
ee/metercollector/spansizemetercollector/provider_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package spansizemetercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildDimensions(t *testing.T) {
|
||||
orgID := valuer.GenerateUUID()
|
||||
rules := []retentiontypes.CustomRetentionRule{{
|
||||
Filters: []retentiontypes.FilterCondition{{
|
||||
Key: "service.name",
|
||||
Values: []string{"api"},
|
||||
}},
|
||||
TTLDays: 7,
|
||||
}}
|
||||
columns := []dimensionColumn{
|
||||
{key: metercollector.DimensionWorkspaceKeyID, alias: "dim_0"},
|
||||
{key: "service.name", alias: "dim_1"},
|
||||
}
|
||||
|
||||
dimensions, err := buildDimensions(orgID, 30, 0, columns, []string{"workspace-1", "api"}, rules)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]string{
|
||||
metercollector.DimensionOrganizationID: orgID.StringValue(),
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
}, dimensions)
|
||||
}
|
||||
|
||||
func TestProviderMetadata(t *testing.T) {
|
||||
provider := New(nil, nil)
|
||||
|
||||
require.Equal(t, "signoz.meter.span.size", provider.Name().String())
|
||||
require.Equal(t, metercollectortypes.UnitBytes, provider.Unit())
|
||||
require.Equal(t, metercollectortypes.AggregationSum, provider.Aggregation())
|
||||
}
|
||||
|
||||
func TestBucketKeyIsStable(t *testing.T) {
|
||||
first := bucketKey(map[string]string{
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
})
|
||||
second := bucketKey(map[string]string{
|
||||
metercollector.DimensionWorkspaceKeyID: "workspace-1",
|
||||
"service.name": "api",
|
||||
metercollector.DimensionRetentionDays: "30",
|
||||
})
|
||||
|
||||
require.Equal(t, first, second)
|
||||
require.NotEmpty(t, first)
|
||||
}
|
||||
|
||||
func TestCollectRejectsInvalidWindowBeforeQuerying(t *testing.T) {
|
||||
readings, err := New(nil, nil).Collect(context.Background(), valuer.GenerateUUID(), meterreportertypes.Window{})
|
||||
require.Error(t, err)
|
||||
require.Nil(t, readings)
|
||||
}
|
||||
630
ee/meterreporter/httpmeterreporter/provider.go
Normal file
630
ee/meterreporter/httpmeterreporter/provider.go
Normal file
@@ -0,0 +1,630 @@
|
||||
package httpmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
var _ factory.ServiceWithHealthy = (*Provider)(nil)
|
||||
|
||||
var errCodeReportFailed = errors.MustNewCode("meterreporter_report_failed")
|
||||
|
||||
const (
|
||||
phaseSealed = "sealed"
|
||||
phaseToday = "today"
|
||||
|
||||
attrPhase = "phase"
|
||||
attrResult = "result"
|
||||
attrMeterReporterProvider = "meterreporter.provider"
|
||||
attrOrgID = "meterreporter.org_id"
|
||||
attrOrgCount = "meterreporter.org_count"
|
||||
attrMeter = "meterreporter.meter"
|
||||
attrDate = "meterreporter.date"
|
||||
attrReadings = "meterreporter.readings"
|
||||
attrReadingsCollected = "meterreporter.readings_collected"
|
||||
attrReadingsDropped = "meterreporter.readings_dropped"
|
||||
attrWindowStartUnixMilli = "meterreporter.window_start_unix_milli"
|
||||
attrWindowEndUnixMilli = "meterreporter.window_end_unix_milli"
|
||||
attrWindowCompleted = "meterreporter.window_completed"
|
||||
attrCatchupStart = "meterreporter.catchup_start"
|
||||
attrCatchupEnd = "meterreporter.catchup_end"
|
||||
attrDurationMs = "meterreporter.duration_ms"
|
||||
attrDryRun = "meterreporter.dry_run"
|
||||
attrIdempotencyKey = "meterreporter.idempotency_key"
|
||||
|
||||
resultSuccess = "success"
|
||||
resultFailure = "failure"
|
||||
|
||||
providerName = "http"
|
||||
)
|
||||
|
||||
// Provider collects registered meters and ships them to Zeus.
|
||||
type Provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
config meterreporter.Config
|
||||
collectors []metercollector.MeterCollector
|
||||
|
||||
licensing licensing.Licensing
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
orgGetter organization.Getter
|
||||
zeus zeus.Zeus
|
||||
|
||||
healthyC chan struct{}
|
||||
stopC chan struct{}
|
||||
goroutinesWg sync.WaitGroup
|
||||
metrics *reporterMetrics
|
||||
}
|
||||
|
||||
// NewFactory registers the HTTP meter reporter.
|
||||
func NewFactory(
|
||||
collectors map[metercollectortypes.Name]metercollector.MeterCollector,
|
||||
licensing licensing.Licensing,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
orgGetter organization.Getter,
|
||||
zeus zeus.Zeus,
|
||||
) factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName(providerName),
|
||||
func(ctx context.Context, providerSettings factory.ProviderSettings, config meterreporter.Config) (meterreporter.Reporter, error) {
|
||||
return newProvider(ctx, providerSettings, config, collectors, licensing, telemetryStore, orgGetter, zeus)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func newProvider(
|
||||
_ context.Context,
|
||||
providerSettings factory.ProviderSettings,
|
||||
config meterreporter.Config,
|
||||
collectors map[metercollectortypes.Name]metercollector.MeterCollector,
|
||||
licensing licensing.Licensing,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
orgGetter organization.Getter,
|
||||
zeus zeus.Zeus,
|
||||
) (*Provider, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/meterreporter/httpmeterreporter")
|
||||
|
||||
metrics, err := newReporterMetrics(settings.Meter())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
orderedCollectors, err := validateCollectors(collectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Provider{
|
||||
settings: settings,
|
||||
config: config,
|
||||
collectors: orderedCollectors,
|
||||
licensing: licensing,
|
||||
telemetryStore: telemetryStore,
|
||||
orgGetter: orgGetter,
|
||||
zeus: zeus,
|
||||
healthyC: make(chan struct{}),
|
||||
stopC: make(chan struct{}),
|
||||
metrics: metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func validateCollectors(collectors map[metercollectortypes.Name]metercollector.MeterCollector) ([]metercollector.MeterCollector, error) {
|
||||
ordered := make([]metercollector.MeterCollector, 0, len(collectors))
|
||||
for name, collector := range collectors {
|
||||
if name.IsZero() {
|
||||
return nil, errors.New(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "empty meter name in collector registry")
|
||||
}
|
||||
if collector == nil {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "nil collector for meter %q", name.String())
|
||||
}
|
||||
if collector.Name() != name {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "registry key %q does not match collector.Name() %q", name.String(), collector.Name().String())
|
||||
}
|
||||
if collector.Unit().IsZero() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "meter %q has empty unit", name.String())
|
||||
}
|
||||
if collector.Aggregation().IsZero() {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, meterreporter.ErrCodeInvalidInput, "meter %q has empty aggregation", name.String())
|
||||
}
|
||||
ordered = append(ordered, collector)
|
||||
}
|
||||
|
||||
sort.Slice(ordered, func(i, j int) bool {
|
||||
return ordered[i].Name().String() < ordered[j].Name().String()
|
||||
})
|
||||
|
||||
return ordered, nil
|
||||
}
|
||||
|
||||
// Start runs an immediate tick, then repeats on Config.Interval.
|
||||
func (provider *Provider) Start(ctx context.Context) error {
|
||||
close(provider.healthyC)
|
||||
|
||||
provider.settings.Logger().InfoContext(ctx, "meter reporter started",
|
||||
slog.Duration("interval", provider.config.Interval),
|
||||
slog.Duration("timeout", provider.config.Timeout),
|
||||
slog.Int("catchup_max_days_per_tick", provider.config.CatchupMaxDaysPerTick),
|
||||
slog.Int("meters", len(provider.collectors)),
|
||||
)
|
||||
|
||||
provider.goroutinesWg.Add(1)
|
||||
go func() {
|
||||
defer provider.goroutinesWg.Done()
|
||||
|
||||
provider.runTick(ctx)
|
||||
|
||||
ticker := time.NewTicker(provider.config.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-provider.stopC:
|
||||
return
|
||||
case <-ticker.C:
|
||||
provider.runTick(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
provider.goroutinesWg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop signals the tick loop and waits for any in-flight tick.
|
||||
func (provider *Provider) Stop(ctx context.Context) error {
|
||||
<-provider.healthyC
|
||||
provider.settings.Logger().InfoContext(ctx, "meter reporter stopping")
|
||||
select {
|
||||
case <-provider.stopC:
|
||||
// already closed
|
||||
default:
|
||||
close(provider.stopC)
|
||||
}
|
||||
provider.goroutinesWg.Wait()
|
||||
provider.settings.Logger().InfoContext(ctx, "meter reporter stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) Healthy() <-chan struct{} {
|
||||
return provider.healthyC
|
||||
}
|
||||
|
||||
// runTick executes one collect-and-ship cycle under Config.Timeout.
|
||||
func (provider *Provider) runTick(parentCtx context.Context) {
|
||||
tickStart := time.Now()
|
||||
ctx, span := provider.settings.Tracer().Start(parentCtx, "meterreporter.Tick", trace.WithAttributes(
|
||||
attribute.String(attrMeterReporterProvider, providerName),
|
||||
attribute.Int("meterreporter.meters", len(provider.collectors)),
|
||||
attribute.Int("meterreporter.catchup_max_days_per_tick", provider.config.CatchupMaxDaysPerTick),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
provider.metrics.ticks.Add(ctx, 1)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, provider.config.Timeout)
|
||||
defer cancel()
|
||||
|
||||
provider.settings.Logger().DebugContext(ctx, "meter reporter tick started",
|
||||
slog.Duration("timeout", provider.config.Timeout),
|
||||
slog.Int("meters", len(provider.collectors)),
|
||||
)
|
||||
|
||||
if err := provider.tick(ctx); err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.SetAttributes(
|
||||
attribute.String(attrResult, resultFailure),
|
||||
attribute.Int64(attrDurationMs, time.Since(tickStart).Milliseconds()),
|
||||
)
|
||||
provider.settings.Logger().ErrorContext(ctx, "meter reporter tick failed",
|
||||
errors.Attr(err),
|
||||
slog.Duration("timeout", provider.config.Timeout),
|
||||
slog.Duration("duration", time.Since(tickStart)),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
span.SetAttributes(
|
||||
attribute.String(attrResult, resultSuccess),
|
||||
attribute.Int64(attrDurationMs, time.Since(tickStart).Milliseconds()),
|
||||
)
|
||||
provider.settings.Logger().DebugContext(ctx, "meter reporter tick completed", slog.Duration("duration", time.Since(tickStart)))
|
||||
}
|
||||
|
||||
// tick processes sealed catchup days, then today's partial window.
|
||||
func (provider *Provider) tick(ctx context.Context) error {
|
||||
now := time.Now().UTC()
|
||||
// Use one timestamp so a tick cannot straddle midnight.
|
||||
todayStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
||||
yesterday := todayStart.AddDate(0, 0, -1)
|
||||
|
||||
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, errCodeReportFailed, "failed to list organizations")
|
||||
}
|
||||
trace.SpanFromContext(ctx).SetAttributes(attribute.Int(attrOrgCount, len(orgs)))
|
||||
if len(orgs) == 0 {
|
||||
provider.settings.Logger().InfoContext(ctx, "skipping meter reporter tick; no organizations found")
|
||||
return nil
|
||||
}
|
||||
org := orgs[0]
|
||||
if len(orgs) > 1 {
|
||||
// signoz_meter samples have no org marker.
|
||||
provider.settings.Logger().WarnContext(ctx, "multiple orgs on a single instance; reporting only the first",
|
||||
slog.Int("org_count", len(orgs)),
|
||||
slog.String("selected_org_id", org.ID.StringValue()),
|
||||
)
|
||||
}
|
||||
trace.SpanFromContext(ctx).SetAttributes(attribute.String(attrOrgID, org.ID.StringValue()))
|
||||
|
||||
license, err := provider.licensing.GetActive(ctx, org.ID)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, errors.TypeInternal, errCodeReportFailed, "failed to fetch active license for org %q", org.ID.StringValue())
|
||||
}
|
||||
if license == nil || license.Key == "" {
|
||||
provider.settings.Logger().WarnContext(ctx, "skipping tick, nil/empty license for org", slog.String("org_id", org.ID.StringValue()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: re-enable once /v2/meters/checkpoints is live in staging. Until
|
||||
// then we run with an empty checkpoint map; bootstrap floors are taken
|
||||
// from data and dropCheckpointed becomes a no-op for the sealed window.
|
||||
// checkpoints, err := provider.zeus.GetMeterCheckpoints(ctx, license.Key)
|
||||
// if err != nil {
|
||||
// provider.metrics.checkpointErrors.Add(ctx, 1)
|
||||
// provider.settings.Logger().ErrorContext(ctx, "skipping tick: meter checkpoints call failed", errors.Attr(err))
|
||||
// return nil
|
||||
// }
|
||||
// checkpointsByMeter := make(map[string]time.Time, len(checkpoints))
|
||||
// for _, checkpoint := range checkpoints {
|
||||
// checkpointsByMeter[checkpoint.Name] = checkpoint.Checkpoint.UTC()
|
||||
// }
|
||||
checkpointsByMeter := make(map[string]time.Time)
|
||||
|
||||
floor := provider.dataFloor(ctx, todayStart)
|
||||
catchupStart := provider.catchupStart(floor, todayStart, checkpointsByMeter)
|
||||
end := catchupStart.AddDate(0, 0, provider.config.CatchupMaxDaysPerTick-1)
|
||||
if end.After(yesterday) {
|
||||
end = yesterday
|
||||
}
|
||||
trace.SpanFromContext(ctx).SetAttributes(
|
||||
attribute.String(attrCatchupStart, catchupStart.Format("2006-01-02")),
|
||||
attribute.String(attrCatchupEnd, end.Format("2006-01-02")),
|
||||
)
|
||||
provider.settings.Logger().DebugContext(ctx, "meter reporter catchup window selected",
|
||||
slog.String("org_id", org.ID.StringValue()),
|
||||
slog.Time("data_floor", floor),
|
||||
slog.Time("catchup_start", catchupStart),
|
||||
slog.Time("catchup_end", end),
|
||||
slog.Int("catchup_max_days_per_tick", provider.config.CatchupMaxDaysPerTick),
|
||||
)
|
||||
for day := catchupStart; !day.After(end); day = day.AddDate(0, 0, 1) {
|
||||
window := meterreportertypes.Window{
|
||||
StartUnixMilli: day.UnixMilli(),
|
||||
EndUnixMilli: day.AddDate(0, 0, 1).UnixMilli(),
|
||||
IsCompleted: true,
|
||||
}
|
||||
err := provider.runPhase(ctx, org.ID, license.Key, window, checkpointsByMeter)
|
||||
result := resultSuccess
|
||||
if err != nil {
|
||||
result = resultFailure
|
||||
}
|
||||
provider.metrics.catchupDaysProcessed.Add(ctx, 1, metric.WithAttributes(attribute.String(attrResult, result)))
|
||||
if err != nil {
|
||||
provider.settings.Logger().WarnContext(ctx, "stopping sealed catchup after failed day",
|
||||
errors.Attr(err),
|
||||
slog.String("date", day.Format("2006-01-02")),
|
||||
)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Today's partial window runs every tick.
|
||||
todayWindow := meterreportertypes.Window{
|
||||
StartUnixMilli: todayStart.UnixMilli(),
|
||||
EndUnixMilli: now.UnixMilli(),
|
||||
IsCompleted: false,
|
||||
}
|
||||
_ = provider.runPhase(ctx, org.ID, license.Key, todayWindow, checkpointsByMeter)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// runPhase collects all meters for one window and ships the batch.
|
||||
func (provider *Provider) runPhase(ctx context.Context, orgID valuer.UUID, licenseKey string, window meterreportertypes.Window, checkpointsByMeter map[string]time.Time) error {
|
||||
phaseLabel := phaseToday
|
||||
if window.IsCompleted {
|
||||
phaseLabel = phaseSealed
|
||||
}
|
||||
phaseAttr := metric.WithAttributes(attribute.String(attrPhase, phaseLabel))
|
||||
date := time.UnixMilli(window.StartUnixMilli).UTC().Format("2006-01-02")
|
||||
phaseStart := time.Now()
|
||||
ctx, span := provider.settings.Tracer().Start(ctx, "meterreporter.RunPhase", trace.WithAttributes(
|
||||
attribute.String(attrPhase, phaseLabel),
|
||||
attribute.String(attrOrgID, orgID.StringValue()),
|
||||
attribute.String(attrDate, date),
|
||||
attribute.Int64(attrWindowStartUnixMilli, window.StartUnixMilli),
|
||||
attribute.Int64(attrWindowEndUnixMilli, window.EndUnixMilli),
|
||||
attribute.Bool(attrWindowCompleted, window.IsCompleted),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
provider.settings.Logger().DebugContext(ctx, "meter reporter phase started",
|
||||
slog.String("org_id", orgID.StringValue()),
|
||||
slog.String("phase", phaseLabel),
|
||||
slog.String("date", date),
|
||||
slog.Int64("start_unix_milli", window.StartUnixMilli),
|
||||
slog.Int64("end_unix_milli", window.EndUnixMilli),
|
||||
slog.Int("meters", len(provider.collectors)),
|
||||
)
|
||||
|
||||
collectStart := time.Now()
|
||||
readings := make([]meterreportertypes.Meter, 0, len(provider.collectors))
|
||||
for _, collector := range provider.collectors {
|
||||
meterName := collector.Name().String()
|
||||
collectStart := time.Now()
|
||||
collectCtx, collectSpan := provider.settings.Tracer().Start(ctx, "meterreporter.CollectMeter", trace.WithAttributes(
|
||||
attribute.String(attrPhase, phaseLabel),
|
||||
attribute.String(attrOrgID, orgID.StringValue()),
|
||||
attribute.String(attrMeter, meterName),
|
||||
attribute.String(attrDate, date),
|
||||
attribute.Int64(attrWindowStartUnixMilli, window.StartUnixMilli),
|
||||
attribute.Int64(attrWindowEndUnixMilli, window.EndUnixMilli),
|
||||
attribute.Bool(attrWindowCompleted, window.IsCompleted),
|
||||
))
|
||||
collectedReadings, err := collector.Collect(collectCtx, orgID, window)
|
||||
if err != nil {
|
||||
collectSpan.RecordError(err)
|
||||
collectSpan.SetStatus(codes.Error, err.Error())
|
||||
collectSpan.SetAttributes(
|
||||
attribute.String(attrResult, resultFailure),
|
||||
attribute.Int64(attrDurationMs, time.Since(collectStart).Milliseconds()),
|
||||
)
|
||||
collectSpan.End()
|
||||
provider.metrics.collectErrors.Add(ctx, 1, phaseAttr)
|
||||
provider.settings.Logger().WarnContext(ctx, "meter collection failed",
|
||||
errors.Attr(err),
|
||||
slog.String("meter", meterName),
|
||||
slog.String("org_id", orgID.StringValue()),
|
||||
slog.String("phase", phaseLabel),
|
||||
slog.String("date", date),
|
||||
slog.Duration("duration", time.Since(collectStart)),
|
||||
)
|
||||
continue
|
||||
}
|
||||
collectSpan.SetAttributes(
|
||||
attribute.String(attrResult, resultSuccess),
|
||||
attribute.Int(attrReadings, len(collectedReadings)),
|
||||
attribute.Int64(attrDurationMs, time.Since(collectStart).Milliseconds()),
|
||||
)
|
||||
collectSpan.End()
|
||||
provider.settings.Logger().DebugContext(ctx, "meter collection completed",
|
||||
slog.String("meter", meterName),
|
||||
slog.String("org_id", orgID.StringValue()),
|
||||
slog.String("phase", phaseLabel),
|
||||
slog.String("date", date),
|
||||
slog.Int("readings", len(collectedReadings)),
|
||||
slog.Duration("duration", time.Since(collectStart)),
|
||||
)
|
||||
readings = append(readings, collectedReadings...)
|
||||
}
|
||||
collectDuration := time.Since(collectStart)
|
||||
provider.metrics.collectDuration.Add(ctx, collectDuration.Seconds(), phaseAttr)
|
||||
provider.metrics.collectOperations.Add(ctx, 1, phaseAttr)
|
||||
span.SetAttributes(attribute.Int(attrReadingsCollected, len(readings)))
|
||||
|
||||
if window.IsCompleted {
|
||||
beforeDrop := len(readings)
|
||||
readings = dropCheckpointed(readings, time.UnixMilli(window.StartUnixMilli).UTC(), checkpointsByMeter)
|
||||
dropped := beforeDrop - len(readings)
|
||||
span.SetAttributes(attribute.Int(attrReadingsDropped, dropped))
|
||||
if dropped > 0 {
|
||||
provider.settings.Logger().DebugContext(ctx, "dropped checkpointed meter readings",
|
||||
slog.String("org_id", orgID.StringValue()),
|
||||
slog.String("phase", phaseLabel),
|
||||
slog.String("date", date),
|
||||
slog.Int("dropped", dropped),
|
||||
slog.Int("remaining", len(readings)),
|
||||
)
|
||||
}
|
||||
}
|
||||
if len(readings) == 0 {
|
||||
span.SetAttributes(
|
||||
attribute.String(attrResult, resultSuccess),
|
||||
attribute.Int(attrReadings, 0),
|
||||
attribute.Int64(attrDurationMs, time.Since(phaseStart).Milliseconds()),
|
||||
)
|
||||
provider.settings.Logger().DebugContext(ctx, "meter reporter phase produced no readings",
|
||||
slog.String("org_id", orgID.StringValue()),
|
||||
slog.String("phase", phaseLabel),
|
||||
slog.String("date", date),
|
||||
slog.Duration("collect_duration", collectDuration),
|
||||
slog.Duration("duration", time.Since(phaseStart)),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
shipStart := time.Now()
|
||||
err := provider.shipReadings(ctx, licenseKey, date, readings)
|
||||
shipDuration := time.Since(shipStart)
|
||||
provider.metrics.shipDuration.Add(ctx, shipDuration.Seconds(), phaseAttr)
|
||||
provider.metrics.shipOperations.Add(ctx, 1, phaseAttr)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.SetAttributes(attribute.String(attrResult, resultFailure))
|
||||
provider.metrics.postErrors.Add(ctx, 1, phaseAttr)
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to ship meter readings",
|
||||
errors.Attr(err),
|
||||
slog.String("phase", phaseLabel),
|
||||
slog.String("date", date),
|
||||
slog.Int("readings", len(readings)),
|
||||
slog.Duration("ship_duration", shipDuration),
|
||||
)
|
||||
return err
|
||||
}
|
||||
provider.metrics.readingsEmitted.Add(ctx, int64(len(readings)), phaseAttr)
|
||||
span.SetAttributes(
|
||||
attribute.String(attrResult, resultSuccess),
|
||||
attribute.Int(attrReadings, len(readings)),
|
||||
attribute.Int64(attrDurationMs, time.Since(phaseStart).Milliseconds()),
|
||||
)
|
||||
provider.settings.Logger().InfoContext(ctx, "meter reporter phase shipped",
|
||||
slog.String("org_id", orgID.StringValue()),
|
||||
slog.String("phase", phaseLabel),
|
||||
slog.String("date", date),
|
||||
slog.Int("readings", len(readings)),
|
||||
slog.Duration("collect_duration", collectDuration),
|
||||
slog.Duration("ship_duration", shipDuration),
|
||||
slog.Duration("duration", time.Since(phaseStart)),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// dropCheckpointed removes readings already covered by meter checkpoints.
|
||||
func dropCheckpointed(readings []meterreportertypes.Meter, windowDay time.Time, checkpointsByMeter map[string]time.Time) []meterreportertypes.Meter {
|
||||
if len(checkpointsByMeter) == 0 {
|
||||
return readings
|
||||
}
|
||||
kept := readings[:0]
|
||||
for _, reading := range readings {
|
||||
checkpoint, ok := checkpointsByMeter[reading.MeterName]
|
||||
if !ok || checkpoint.Before(windowDay) {
|
||||
kept = append(kept, reading)
|
||||
}
|
||||
}
|
||||
return kept
|
||||
}
|
||||
|
||||
// catchupStart returns the earliest UTC day that still needs sealed reporting.
|
||||
func (provider *Provider) catchupStart(floor time.Time, todayStart time.Time, checkpointsByMeter map[string]time.Time) time.Time {
|
||||
catchupStart := todayStart
|
||||
|
||||
for _, collector := range provider.collectors {
|
||||
next := floor
|
||||
if checkpoint, ok := checkpointsByMeter[collector.Name().String()]; ok {
|
||||
next = checkpoint.AddDate(0, 0, 1)
|
||||
if next.Before(floor) {
|
||||
next = floor
|
||||
}
|
||||
}
|
||||
if next.Before(catchupStart) {
|
||||
catchupStart = next
|
||||
}
|
||||
}
|
||||
|
||||
yesterday := todayStart.AddDate(0, 0, -1)
|
||||
if catchupStart.After(yesterday) {
|
||||
catchupStart = yesterday
|
||||
}
|
||||
|
||||
return catchupStart
|
||||
}
|
||||
|
||||
// dataFloor returns the earliest signoz_meter sample day, or today on failure.
|
||||
func (provider *Provider) dataFloor(ctx context.Context, todayStart time.Time) time.Time {
|
||||
ctx, span := provider.settings.Tracer().Start(ctx, "meterreporter.DataFloor")
|
||||
defer span.End()
|
||||
|
||||
if provider.telemetryStore == nil {
|
||||
span.SetAttributes(attribute.String(attrResult, resultSuccess))
|
||||
return todayStart
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("ifNull(min(unix_milli), 0)")
|
||||
sb.From(telemetrymeter.DBName + "." + telemetrymeter.SamplesTableName)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
var minMs int64
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, query, args...).Scan(&minMs); err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.SetAttributes(attribute.String(attrResult, resultFailure))
|
||||
provider.settings.Logger().WarnContext(ctx, "failed to read data floor; falling back to latest sealed day", errors.Attr(err))
|
||||
return todayStart
|
||||
}
|
||||
if minMs == 0 {
|
||||
span.SetAttributes(
|
||||
attribute.String(attrResult, resultSuccess),
|
||||
attribute.Int64("meterreporter.data_floor_unix_milli", 0),
|
||||
)
|
||||
return todayStart
|
||||
}
|
||||
|
||||
minDay := time.UnixMilli(minMs).UTC()
|
||||
floor := time.Date(minDay.Year(), minDay.Month(), minDay.Day(), 0, 0, 0, 0, time.UTC)
|
||||
span.SetAttributes(
|
||||
attribute.String(attrResult, resultSuccess),
|
||||
attribute.Int64("meterreporter.data_floor_unix_milli", floor.UnixMilli()),
|
||||
)
|
||||
provider.settings.Logger().DebugContext(ctx, "meter reporter data floor loaded", slog.Time("data_floor", floor))
|
||||
return floor
|
||||
}
|
||||
|
||||
// shipReadings sends one day's meter batch to Zeus.
|
||||
func (provider *Provider) shipReadings(ctx context.Context, licenseKey string, date string, readings []meterreportertypes.Meter) error {
|
||||
idempotencyKey := fmt.Sprintf("meter-cron:%s", date)
|
||||
ctx, span := provider.settings.Tracer().Start(ctx, "meterreporter.ShipReadings", trace.WithAttributes(
|
||||
attribute.String(attrDate, date),
|
||||
attribute.Int(attrReadings, len(readings)),
|
||||
attribute.String(attrIdempotencyKey, idempotencyKey),
|
||||
attribute.Bool(attrDryRun, true),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
provider.settings.Logger().InfoContext(ctx, "meter readings prepared for shipment",
|
||||
slog.String("date", date),
|
||||
slog.Int("readings", len(readings)),
|
||||
slog.String("idempotency_key", idempotencyKey),
|
||||
slog.Bool("dry_run", true),
|
||||
)
|
||||
|
||||
// Temporary visibility while /v2/meters is offline.
|
||||
for _, reading := range readings {
|
||||
provider.settings.Logger().InfoContext(ctx, "meter reading prepared for shipment",
|
||||
slog.String("meter", reading.MeterName),
|
||||
slog.Float64("value", reading.Value),
|
||||
slog.String("unit", reading.Unit.StringValue()),
|
||||
slog.String("aggregation", reading.Aggregation.StringValue()),
|
||||
slog.Int64("start_unix_milli", reading.StartUnixMilli),
|
||||
slog.Int64("end_unix_milli", reading.EndUnixMilli),
|
||||
slog.Bool("is_completed", reading.IsCompleted),
|
||||
slog.Any("dimensions", reading.Dimensions),
|
||||
slog.String("idempotency_key", idempotencyKey),
|
||||
)
|
||||
}
|
||||
|
||||
// TODO: re-enable once /v2/meters is live in staging.
|
||||
// body, err := json.Marshal(meterreportertypes.PostableMeters{Meters: readings})
|
||||
// if err != nil {
|
||||
// return errors.Wrapf(err, errors.TypeInternal, errCodeReportFailed, "marshal meter readings for %s", date)
|
||||
// }
|
||||
// if err := provider.zeus.PutMetersV3(ctx, licenseKey, idempotencyKey, body); err != nil {
|
||||
// return errors.Wrapf(err, errors.TypeInternal, errCodeReportFailed, "ship meter readings for %s", date)
|
||||
// }
|
||||
_ = licenseKey
|
||||
span.SetAttributes(attribute.String(attrResult, resultSuccess))
|
||||
return nil
|
||||
}
|
||||
108
ee/meterreporter/httpmeterreporter/provider_internal_test.go
Normal file
108
ee/meterreporter/httpmeterreporter/provider_internal_test.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package httpmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/metercollector"
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestValidateCollectorsRejectsBadRegistry(t *testing.T) {
|
||||
meterA := metercollectortypes.MustNewName("signoz.test.a")
|
||||
meterB := metercollectortypes.MustNewName("signoz.test.b")
|
||||
|
||||
t.Run("key name mismatch", func(t *testing.T) {
|
||||
_, err := validateCollectors(map[metercollectortypes.Name]metercollector.MeterCollector{
|
||||
meterA: testCollector{name: meterB},
|
||||
})
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("nil collector", func(t *testing.T) {
|
||||
_, err := validateCollectors(map[metercollectortypes.Name]metercollector.MeterCollector{
|
||||
meterA: nil,
|
||||
})
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDropCheckpointed(t *testing.T) {
|
||||
meterA := metercollectortypes.MustNewName("signoz.test.a")
|
||||
meterB := metercollectortypes.MustNewName("signoz.test.b")
|
||||
meterC := metercollectortypes.MustNewName("signoz.test.c")
|
||||
windowDay := time.Date(2026, 5, 4, 0, 0, 0, 0, time.UTC)
|
||||
window := meterreportertypes.Window{
|
||||
StartUnixMilli: windowDay.UnixMilli(),
|
||||
EndUnixMilli: windowDay.AddDate(0, 0, 1).UnixMilli(),
|
||||
IsCompleted: true,
|
||||
}
|
||||
readings := []meterreportertypes.Meter{
|
||||
meterreportertypes.NewMeter(meterA, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
|
||||
meterreportertypes.NewMeter(meterB, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
|
||||
meterreportertypes.NewMeter(meterC, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
|
||||
}
|
||||
|
||||
kept := dropCheckpointed(readings, windowDay, map[string]time.Time{
|
||||
meterA.String(): windowDay,
|
||||
meterB.String(): windowDay.AddDate(0, 0, -1),
|
||||
})
|
||||
|
||||
require.Equal(t, []meterreportertypes.Meter{
|
||||
meterreportertypes.NewMeter(meterB, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
|
||||
meterreportertypes.NewMeter(meterC, 0, metercollectortypes.UnitCount, metercollectortypes.AggregationSum, window, nil),
|
||||
}, kept)
|
||||
}
|
||||
|
||||
func TestCatchupStart(t *testing.T) {
|
||||
meterA := metercollectortypes.MustNewName("signoz.test.a")
|
||||
floor := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
|
||||
todayStart := time.Date(2026, 5, 5, 0, 0, 0, 0, time.UTC)
|
||||
provider := &Provider{
|
||||
collectors: []metercollector.MeterCollector{
|
||||
testCollector{name: meterA},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("no checkpoint starts at floor", func(t *testing.T) {
|
||||
require.Equal(t, floor, provider.catchupStart(floor, todayStart, nil))
|
||||
})
|
||||
|
||||
t.Run("checkpoint advances by one day", func(t *testing.T) {
|
||||
require.Equal(t, floor.AddDate(0, 0, 2), provider.catchupStart(floor, todayStart, map[string]time.Time{
|
||||
meterA.String(): floor.AddDate(0, 0, 1),
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
type testCollector struct {
|
||||
name metercollectortypes.Name
|
||||
unit metercollectortypes.Unit
|
||||
aggregation metercollectortypes.Aggregation
|
||||
}
|
||||
|
||||
func (c testCollector) Name() metercollectortypes.Name {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func (c testCollector) Unit() metercollectortypes.Unit {
|
||||
if c.unit.IsZero() {
|
||||
return metercollectortypes.UnitCount
|
||||
}
|
||||
return c.unit
|
||||
}
|
||||
|
||||
func (c testCollector) Aggregation() metercollectortypes.Aggregation {
|
||||
if c.aggregation.IsZero() {
|
||||
return metercollectortypes.AggregationSum
|
||||
}
|
||||
return c.aggregation
|
||||
}
|
||||
|
||||
func (c testCollector) Collect(context.Context, valuer.UUID, meterreportertypes.Window) ([]meterreportertypes.Meter, error) {
|
||||
return nil, nil
|
||||
}
|
||||
90
ee/meterreporter/httpmeterreporter/telemetry.go
Normal file
90
ee/meterreporter/httpmeterreporter/telemetry.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package httpmeterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
type reporterMetrics struct {
|
||||
ticks metric.Int64Counter
|
||||
readingsEmitted metric.Int64Counter
|
||||
collectErrors metric.Int64Counter
|
||||
postErrors metric.Int64Counter
|
||||
checkpointErrors metric.Int64Counter
|
||||
catchupDaysProcessed metric.Int64Counter
|
||||
collectDuration metric.Float64Counter
|
||||
collectOperations metric.Int64Counter
|
||||
shipDuration metric.Float64Counter
|
||||
shipOperations metric.Int64Counter
|
||||
}
|
||||
|
||||
func newReporterMetrics(meter metric.Meter) (*reporterMetrics, error) {
|
||||
var errs error
|
||||
|
||||
ticks, err := meter.Int64Counter("signoz.meterreporter.ticks", metric.WithDescription("Meter reporter ticks."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
readingsEmitted, err := meter.Int64Counter("signoz.meterreporter.readings.emitted", metric.WithDescription("Meter readings shipped to Zeus."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
collectErrors, err := meter.Int64Counter("signoz.meterreporter.collect.errors", metric.WithDescription("Meter collection errors."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
postErrors, err := meter.Int64Counter("signoz.meterreporter.post.errors", metric.WithDescription("Zeus POST failures."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
checkpointErrors, err := meter.Int64Counter("signoz.meterreporter.checkpoint.errors", metric.WithDescription("Zeus checkpoint read failures."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
catchupDaysProcessed, err := meter.Int64Counter("signoz.meterreporter.catchup.days_processed", metric.WithDescription("Sealed catchup days processed."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
collectDuration, err := meter.Float64Counter("signoz.meterreporter.collect.duration.seconds", metric.WithDescription("Cumulative collection duration."), metric.WithUnit("s"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
collectOperations, err := meter.Int64Counter("signoz.meterreporter.collect.operations", metric.WithDescription("Collection phases measured."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
shipDuration, err := meter.Float64Counter("signoz.meterreporter.ship.duration.seconds", metric.WithDescription("Cumulative ship duration."), metric.WithUnit("s"))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
shipOperations, err := meter.Int64Counter("signoz.meterreporter.ship.operations", metric.WithDescription("Ship phases measured."))
|
||||
if err != nil {
|
||||
errs = errors.Join(errs, err)
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
return &reporterMetrics{
|
||||
ticks: ticks,
|
||||
readingsEmitted: readingsEmitted,
|
||||
collectErrors: collectErrors,
|
||||
postErrors: postErrors,
|
||||
checkpointErrors: checkpointErrors,
|
||||
catchupDaysProcessed: catchupDaysProcessed,
|
||||
collectDuration: collectDuration,
|
||||
collectOperations: collectOperations,
|
||||
shipDuration: shipDuration,
|
||||
shipOperations: shipOperations,
|
||||
}, nil
|
||||
}
|
||||
@@ -150,6 +150,72 @@ func (provider *Provider) PutMetersV2(ctx context.Context, key string, data []by
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) PutMetersV3(ctx context.Context, key string, idempotencyKey string, data []byte) error {
|
||||
headers := http.Header{}
|
||||
if idempotencyKey != "" {
|
||||
headers.Set("X-Idempotency-Key", idempotencyKey)
|
||||
}
|
||||
|
||||
_, err := provider.doWithHeaders(
|
||||
ctx,
|
||||
provider.config.URL.JoinPath("/v2/meters"),
|
||||
http.MethodPost,
|
||||
key,
|
||||
data,
|
||||
headers,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) GetMeterCheckpoints(ctx context.Context, key string) ([]zeustypes.MeterCheckpoint, error) {
|
||||
response, err := provider.do(
|
||||
ctx,
|
||||
provider.config.URL.JoinPath("/v2/meters/checkpoints"),
|
||||
http.MethodGet,
|
||||
key,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
checkpointValues := gjson.GetBytes(response, "data.checkpoints")
|
||||
if !checkpointValues.Exists() || checkpointValues.Type == gjson.Null {
|
||||
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoints are required")
|
||||
}
|
||||
|
||||
if !checkpointValues.IsArray() {
|
||||
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoints must be an array")
|
||||
}
|
||||
|
||||
checkpointResults := checkpointValues.Array()
|
||||
checkpoints := make([]zeustypes.MeterCheckpoint, 0, len(checkpointResults))
|
||||
for _, checkpointValue := range checkpointResults {
|
||||
name := checkpointValue.Get("name").String()
|
||||
if name == "" {
|
||||
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoint name is required")
|
||||
}
|
||||
|
||||
checkpointString := checkpointValue.Get("checkpoint").String()
|
||||
if checkpointString == "" {
|
||||
return nil, errors.Newf(errors.TypeInternal, zeus.ErrCodeResponseMalformed, "meter checkpoint is required for %q", name)
|
||||
}
|
||||
|
||||
checkpoint, err := time.Parse("2006-01-02", checkpointString)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, zeus.ErrCodeResponseMalformed, "parse meter checkpoint %q for %q", checkpointString, name)
|
||||
}
|
||||
|
||||
checkpoints = append(checkpoints, zeustypes.MeterCheckpoint{
|
||||
Name: name,
|
||||
Checkpoint: checkpoint,
|
||||
})
|
||||
}
|
||||
|
||||
return checkpoints, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) PutProfile(ctx context.Context, key string, profile *zeustypes.PostableProfile) error {
|
||||
body, err := json.Marshal(profile)
|
||||
if err != nil {
|
||||
@@ -185,12 +251,21 @@ func (provider *Provider) PutHost(ctx context.Context, key string, host *zeustyp
|
||||
}
|
||||
|
||||
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
|
||||
return provider.doWithHeaders(ctx, url, method, key, requestBody, nil)
|
||||
}
|
||||
|
||||
func (provider *Provider) doWithHeaders(ctx context.Context, url *url.URL, method string, key string, requestBody []byte, extraHeaders http.Header) ([]byte, error) {
|
||||
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
|
||||
request.Header.Set("Content-Type", "application/json")
|
||||
for k, vs := range extraHeaders {
|
||||
for _, v := range vs {
|
||||
request.Header.Add(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
response, err := provider.httpClient.Do(request)
|
||||
if err != nil {
|
||||
|
||||
@@ -8,6 +8,7 @@ var (
|
||||
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
|
||||
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
|
||||
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
|
||||
FeatureUseMeterReporter = featuretypes.MustNewName("use_meter_reporter")
|
||||
FeatureUseJSONBody = featuretypes.MustNewName("use_json_body")
|
||||
)
|
||||
|
||||
@@ -53,6 +54,14 @@ func MustNewRegistry() featuretypes.Registry {
|
||||
DefaultVariant: featuretypes.MustNewName("disabled"),
|
||||
Variants: featuretypes.NewBooleanVariants(),
|
||||
},
|
||||
&featuretypes.Feature{
|
||||
Name: FeatureUseMeterReporter,
|
||||
Kind: featuretypes.KindBoolean,
|
||||
Stage: featuretypes.StageExperimental,
|
||||
Description: "Controls whether the enterprise meter reporter runs instead of the noop reporter",
|
||||
DefaultVariant: featuretypes.MustNewName("disabled"),
|
||||
Variants: featuretypes.NewBooleanVariants(),
|
||||
},
|
||||
&featuretypes.Feature{
|
||||
Name: FeatureUseJSONBody,
|
||||
Kind: featuretypes.KindBoolean,
|
||||
|
||||
12
pkg/metercollector/dimensions.go
Normal file
12
pkg/metercollector/dimensions.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package metercollector
|
||||
|
||||
const (
|
||||
// DimensionOrganizationID identifies the organization.
|
||||
DimensionOrganizationID = "signoz.billing.organization.id"
|
||||
|
||||
// DimensionRetentionDays identifies the retention bucket a meter belongs to.
|
||||
DimensionRetentionDays = "signoz.billing.retention.days"
|
||||
|
||||
// DimensionWorkspaceKeyID identifies the ingestion workspace key.
|
||||
DimensionWorkspaceKeyID = "signoz.workspace.key.id"
|
||||
)
|
||||
6
pkg/metercollector/errors.go
Normal file
6
pkg/metercollector/errors.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package metercollector
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/errors"
|
||||
|
||||
// ErrCodeCollectFailed is the shared error code for collector failures.
|
||||
var ErrCodeCollectFailed = errors.MustNewCode("metercollector_collect_failed")
|
||||
19
pkg/metercollector/metercollector.go
Normal file
19
pkg/metercollector/metercollector.go
Normal file
@@ -0,0 +1,19 @@
|
||||
// Package metercollector defines the contract for billing meter collectors.
|
||||
package metercollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/meterreportertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MeterCollector owns one billing meter's metadata and collection query.
|
||||
// Collect stamps DimensionOrganizationID and returns errors instead of panics.
|
||||
type MeterCollector interface {
|
||||
Name() metercollectortypes.Name
|
||||
Unit() metercollectortypes.Unit
|
||||
Aggregation() metercollectortypes.Aggregation
|
||||
Collect(ctx context.Context, orgID valuer.UUID, window meterreportertypes.Window) ([]meterreportertypes.Meter, error)
|
||||
}
|
||||
53
pkg/meterreporter/config.go
Normal file
53
pkg/meterreporter/config.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
var _ factory.Config = (*Config)(nil)
|
||||
|
||||
type Config struct {
|
||||
// Interval is how often the reporter collects and ships meters.
|
||||
Interval time.Duration `mapstructure:"interval"`
|
||||
|
||||
// Timeout bounds one collect-and-ship tick.
|
||||
Timeout time.Duration `mapstructure:"timeout"`
|
||||
|
||||
// CatchupMaxDaysPerTick caps sealed-day catchup work per tick.
|
||||
CatchupMaxDaysPerTick int `mapstructure:"catchup_max_days_per_tick"`
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Interval: 6 * time.Hour,
|
||||
Timeout: 5 * time.Minute,
|
||||
CatchupMaxDaysPerTick: 180,
|
||||
}
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("meterreporter"), newConfig)
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
if c.Interval < 5*time.Minute {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::interval must be at least 5m")
|
||||
}
|
||||
|
||||
if c.Timeout < 3*time.Minute {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be at least 3m")
|
||||
}
|
||||
|
||||
if c.Timeout >= c.Interval {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::timeout must be less than meterreporter::interval")
|
||||
}
|
||||
|
||||
if c.CatchupMaxDaysPerTick < 1 || c.CatchupMaxDaysPerTick > 180 {
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeInvalidInput, "meterreporter::catchup_max_days_per_tick must be between 1 and 180")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
14
pkg/meterreporter/meterreporter.go
Normal file
14
pkg/meterreporter/meterreporter.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package meterreporter
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeInvalidInput = errors.MustNewCode("meterreporter_invalid_input")
|
||||
)
|
||||
|
||||
type Reporter interface {
|
||||
factory.ServiceWithHealthy
|
||||
}
|
||||
39
pkg/meterreporter/noopmeterreporter/provider.go
Normal file
39
pkg/meterreporter/noopmeterreporter/provider.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package noopmeterreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
healthyC chan struct{}
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("noop"), New)
|
||||
}
|
||||
|
||||
func New(_ context.Context, _ factory.ProviderSettings, _ meterreporter.Config) (meterreporter.Reporter, error) {
|
||||
return &provider{
|
||||
healthyC: make(chan struct{}),
|
||||
stopC: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider) Start(_ context.Context) error {
|
||||
close(p.healthyC)
|
||||
<-p.stopC
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Stop(_ context.Context) error {
|
||||
close(p.stopC)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Healthy() <-chan struct{} {
|
||||
return p.healthyC
|
||||
}
|
||||
@@ -21,9 +21,9 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
@@ -1342,7 +1342,7 @@ func getLocalTableName(tableName string) string {
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "clickhouse-reader",
|
||||
@@ -1377,7 +1377,7 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
if apiErr != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
|
||||
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||
}
|
||||
}
|
||||
@@ -1425,18 +1425,14 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
// we will change ttl for only the new parts and not the old ones
|
||||
query += " SETTINGS materialize_ttl_after_modify=0"
|
||||
|
||||
ttl := types.TTLSetting{
|
||||
Identifiable: types.Identifiable{
|
||||
ID: valuer.GenerateUUID(),
|
||||
},
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
ttl := retentiontypes.TTLSetting{
|
||||
ID: valuer.GenerateUUID(),
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
TransactionID: uuid,
|
||||
TableName: tableName,
|
||||
TTL: int(params.DelDuration),
|
||||
Status: constants.StatusPending,
|
||||
Status: retentiontypes.TTLSettingStatusPending,
|
||||
ColdStorageTTL: coldStorageDuration,
|
||||
OrgID: orgID,
|
||||
}
|
||||
@@ -1460,9 +1456,9 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusFailed).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -1480,9 +1476,9 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusFailed).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -1495,9 +1491,9 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusSuccess).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -1507,10 +1503,10 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
}
|
||||
|
||||
}(ttlPayload)
|
||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalTraces.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "clickhouse-reader",
|
||||
@@ -1540,7 +1536,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
if apiErr != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
|
||||
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||
}
|
||||
}
|
||||
@@ -1563,18 +1559,14 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
timestamp = "end"
|
||||
}
|
||||
|
||||
ttl := types.TTLSetting{
|
||||
Identifiable: types.Identifiable{
|
||||
ID: valuer.GenerateUUID(),
|
||||
},
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
ttl := retentiontypes.TTLSetting{
|
||||
ID: valuer.GenerateUUID(),
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
TransactionID: uuid,
|
||||
TableName: tableName,
|
||||
TTL: int(params.DelDuration),
|
||||
Status: constants.StatusPending,
|
||||
Status: retentiontypes.TTLSettingStatusPending,
|
||||
ColdStorageTTL: coldStorageDuration,
|
||||
OrgID: orgID,
|
||||
}
|
||||
@@ -1610,9 +1602,9 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusFailed).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -1631,9 +1623,9 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusFailed).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -1646,9 +1638,9 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusSuccess).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -1657,7 +1649,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
}
|
||||
}(distributedTableName)
|
||||
}
|
||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool, error) {
|
||||
@@ -1686,7 +1678,7 @@ func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool,
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) {
|
||||
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *retentiontypes.CustomRetentionTTLParams) (*retentiontypes.CustomRetentionTTLResponse, error) {
|
||||
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalLogs.StringValue(),
|
||||
@@ -1701,7 +1693,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
if !hasCustomRetention {
|
||||
r.logger.Info("Custom retention not supported, falling back to standard TTL method", "orgID", orgID)
|
||||
|
||||
ttlParams := &model.TTLParams{
|
||||
ttlParams := &retentiontypes.TTLParams{
|
||||
Type: params.Type,
|
||||
DelDuration: int64(params.DefaultTTLDays * 24 * 3600),
|
||||
}
|
||||
@@ -1722,7 +1714,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
return nil, errorsV2.Wrapf(apiErr.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to set standard TTL")
|
||||
}
|
||||
|
||||
return &model.CustomRetentionTTLResponse{
|
||||
return &retentiontypes.CustomRetentionTTLResponse{
|
||||
Message: fmt.Sprintf("Custom retention not supported, applied standard TTL of %d days. %s", params.DefaultTTLDays, ttlResult.Message),
|
||||
}, nil
|
||||
}
|
||||
@@ -1733,7 +1725,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
uuidWithHyphen := valuer.GenerateUUID()
|
||||
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
||||
|
||||
if params.Type != constants.LogsTTL {
|
||||
if params.Type != retentiontypes.LogsTTL {
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL only supported for logs")
|
||||
}
|
||||
|
||||
@@ -1764,7 +1756,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
if apiErr != nil {
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query")
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL is already running")
|
||||
}
|
||||
}
|
||||
@@ -1838,19 +1830,15 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
}
|
||||
|
||||
for tableName, queries := range ttlPayload {
|
||||
customTTL := types.TTLSetting{
|
||||
Identifiable: types.Identifiable{
|
||||
ID: valuer.GenerateUUID(),
|
||||
},
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
customTTL := retentiontypes.TTLSetting{
|
||||
ID: valuer.GenerateUUID(),
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
TransactionID: uuid,
|
||||
TableName: tableName,
|
||||
TTL: params.DefaultTTLDays,
|
||||
Condition: string(ttlConditionsJSON),
|
||||
Status: constants.StatusPending,
|
||||
Status: retentiontypes.TTLSettingStatusPending,
|
||||
ColdStorageTTL: coldStorageDuration,
|
||||
OrgID: orgID,
|
||||
}
|
||||
@@ -1866,7 +1854,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
err := r.setColdStorage(ctx, tableName, params.ColdStorageVolume)
|
||||
if err != nil {
|
||||
r.logger.Error("error in setting cold storage", errorsV2.Attr(err))
|
||||
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
||||
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusFailed)
|
||||
return nil, errorsV2.Wrapf(err.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting cold storage for table %s", tableName)
|
||||
}
|
||||
}
|
||||
@@ -1875,21 +1863,21 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
r.logger.Debug("Executing custom retention TTL request: ", "request", query, "step", i+1)
|
||||
if err := r.db.Exec(ctx, query); err != nil {
|
||||
r.logger.Error("error while setting custom retention ttl", errorsV2.Attr(err))
|
||||
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
||||
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusFailed)
|
||||
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s, query: %s", tableName, query)
|
||||
}
|
||||
}
|
||||
|
||||
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
|
||||
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, retentiontypes.TTLSettingStatusSuccess)
|
||||
}
|
||||
|
||||
return &model.CustomRetentionTTLResponse{
|
||||
return &retentiontypes.CustomRetentionTTLResponse{
|
||||
Message: "custom retention TTL has been successfully set up",
|
||||
}, nil
|
||||
}
|
||||
|
||||
// New method to build multiIf expressions with support for multiple AND conditions
|
||||
func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []model.CustomRetentionRule, defaultTTLDays int, isResourceTable bool) string {
|
||||
func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []retentiontypes.CustomRetentionRule, defaultTTLDays int, isResourceTable bool) string {
|
||||
var conditions []string
|
||||
|
||||
for i, rule := range ttlConditions {
|
||||
@@ -1961,7 +1949,7 @@ func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []model.CustomRe
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error) {
|
||||
func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID string) (*retentiontypes.GetCustomRetentionTTLResponse, error) {
|
||||
// Check if V2 (custom retention) is supported
|
||||
hasCustomRetention, err := r.hasCustomRetentionColumn(ctx)
|
||||
if err != nil {
|
||||
@@ -1970,14 +1958,14 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
|
||||
hasCustomRetention = false
|
||||
}
|
||||
|
||||
response := &model.GetCustomRetentionTTLResponse{}
|
||||
response := &retentiontypes.GetCustomRetentionTTLResponse{}
|
||||
|
||||
if hasCustomRetention {
|
||||
// V2 - Custom retention is supported
|
||||
response.Version = "v2"
|
||||
|
||||
// Get the latest custom retention TTL setting
|
||||
customTTL := new(types.TTLSetting)
|
||||
customTTL := new(retentiontypes.TTLSetting)
|
||||
err := r.sqlDB.BunDB().NewSelect().
|
||||
Model(customTTL).
|
||||
Where("org_id = ?", orgID).
|
||||
@@ -1993,19 +1981,19 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
// No V2 configuration found, return defaults
|
||||
response.DefaultTTLDays = 15
|
||||
response.TTLConditions = []model.CustomRetentionRule{}
|
||||
response.Status = constants.StatusSuccess
|
||||
response.DefaultTTLDays = retentiontypes.DefaultLogsRetentionDays
|
||||
response.TTLConditions = []retentiontypes.CustomRetentionRule{}
|
||||
response.Status = retentiontypes.TTLSettingStatusSuccess
|
||||
response.ColdStorageTTLDays = -1
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// Parse TTL conditions from Condition
|
||||
var ttlConditions []model.CustomRetentionRule
|
||||
var ttlConditions []retentiontypes.CustomRetentionRule
|
||||
if customTTL.Condition != "" {
|
||||
if err := json.Unmarshal([]byte(customTTL.Condition), &ttlConditions); err != nil {
|
||||
r.logger.Error("Error parsing TTL conditions", errorsV2.Attr(err))
|
||||
ttlConditions = []model.CustomRetentionRule{}
|
||||
ttlConditions = []retentiontypes.CustomRetentionRule{}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2019,8 +2007,8 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
|
||||
response.Version = "v1"
|
||||
|
||||
// Get V1 TTL configuration
|
||||
ttlParams := &model.GetTTLParams{
|
||||
Type: constants.LogsTTL,
|
||||
ttlParams := &retentiontypes.GetTTLParams{
|
||||
Type: retentiontypes.LogsTTL,
|
||||
}
|
||||
|
||||
ttlResult, apiErr := r.GetTTL(ctx, orgID, ttlParams)
|
||||
@@ -2040,14 +2028,14 @@ func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID stri
|
||||
}
|
||||
|
||||
// For V1, we don't have TTL conditions
|
||||
response.TTLConditions = []model.CustomRetentionRule{}
|
||||
response.TTLConditions = []retentiontypes.CustomRetentionRule{}
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) checkCustomRetentionTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, error) {
|
||||
ttl := new(types.TTLSetting)
|
||||
func (r *ClickHouseReader) checkCustomRetentionTTLStatusItem(ctx context.Context, orgID string, tableName string) (*retentiontypes.TTLSetting, error) {
|
||||
ttl := new(retentiontypes.TTLSetting)
|
||||
err := r.sqlDB.BunDB().NewSelect().
|
||||
Model(ttl).
|
||||
Where("table_name = ?", tableName).
|
||||
@@ -2068,7 +2056,7 @@ func (r *ClickHouseReader) updateCustomRetentionTTLStatus(ctx context.Context, o
|
||||
statusItem, apiErr := r.checkCustomRetentionTTLStatusItem(ctx, orgID, tableName)
|
||||
if apiErr == nil && statusItem != nil {
|
||||
_, dbErr := r.sqlDB.BunDB().NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", status).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
@@ -2080,7 +2068,7 @@ func (r *ClickHouseReader) updateCustomRetentionTTLStatus(ctx context.Context, o
|
||||
}
|
||||
|
||||
// Enhanced validation function with duplicate detection and efficient key validation
|
||||
func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditions []model.CustomRetentionRule) error {
|
||||
func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditions []retentiontypes.CustomRetentionRule) error {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.CodeNamespace: "clickhouse-reader",
|
||||
instrumentationtypes.CodeFunctionName: "validateTTLConditions",
|
||||
@@ -2184,16 +2172,16 @@ func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditi
|
||||
// SetTTL sets the TTL for traces or metrics or logs tables.
|
||||
// This is an async API which creates goroutines to set TTL.
|
||||
// Status of TTL update is tracked with ttl_status table in sqlite db.
|
||||
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
|
||||
// Keep only latest 100 transactions/requests
|
||||
r.deleteTtlTransactions(ctx, orgID, 100)
|
||||
|
||||
switch params.Type {
|
||||
case constants.TraceTTL:
|
||||
case retentiontypes.TraceTTL:
|
||||
return r.setTTLTraces(ctx, orgID, params)
|
||||
case constants.MetricsTTL:
|
||||
case retentiontypes.MetricsTTL:
|
||||
return r.setTTLMetrics(ctx, orgID, params)
|
||||
case constants.LogsTTL:
|
||||
case retentiontypes.LogsTTL:
|
||||
return r.setTTLLogs(ctx, orgID, params)
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", params.Type)}
|
||||
@@ -2201,7 +2189,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *mod
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, params *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalMetrics.StringValue(),
|
||||
instrumentationtypes.CodeNamespace: "clickhouse-reader",
|
||||
@@ -2230,23 +2218,19 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
|
||||
if apiErr != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
if statusItem.Status == retentiontypes.TTLSettingStatusPending {
|
||||
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||
}
|
||||
}
|
||||
metricTTL := func(tableName string) {
|
||||
ttl := types.TTLSetting{
|
||||
Identifiable: types.Identifiable{
|
||||
ID: valuer.GenerateUUID(),
|
||||
},
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
ttl := retentiontypes.TTLSetting{
|
||||
ID: valuer.GenerateUUID(),
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
TransactionID: uuid,
|
||||
TableName: tableName,
|
||||
TTL: int(params.DelDuration),
|
||||
Status: constants.StatusPending,
|
||||
Status: retentiontypes.TTLSettingStatusPending,
|
||||
ColdStorageTTL: coldStorageDuration,
|
||||
OrgID: orgID,
|
||||
}
|
||||
@@ -2282,9 +2266,9 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusFailed).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -2303,9 +2287,9 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusFailed).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusFailed).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -2318,9 +2302,9 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Set("updated_at = ?", time.Now()).
|
||||
Set("status = ?", constants.StatusSuccess).
|
||||
Set("status = ?", retentiontypes.TTLSettingStatusSuccess).
|
||||
Where("id = ?", statusItem.ID.StringValue()).
|
||||
Exec(ctx)
|
||||
if dbErr != nil {
|
||||
@@ -2331,7 +2315,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
|
||||
for _, tableName := range tableNames {
|
||||
go metricTTL(tableName)
|
||||
}
|
||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
return &retentiontypes.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID string, numberOfTransactionsStore int) {
|
||||
@@ -2341,7 +2325,7 @@ func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID stri
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Column("transaction_id").
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Where("org_id = ?", orgID).
|
||||
Group("transaction_id").
|
||||
OrderExpr("MAX(created_at) DESC").
|
||||
@@ -2356,7 +2340,7 @@ func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID stri
|
||||
sqlDB.
|
||||
BunDB().
|
||||
NewDelete().
|
||||
Model(new(types.TTLSetting)).
|
||||
Model(new(retentiontypes.TTLSetting)).
|
||||
Where("transaction_id NOT IN (?)", bun.In(limitTransactions)).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
@@ -2365,9 +2349,9 @@ func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID stri
|
||||
}
|
||||
|
||||
// checkTTLStatusItem checks if ttl_status table has an entry for the given table name
|
||||
func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, *model.ApiError) {
|
||||
func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string, tableName string) (*retentiontypes.TTLSetting, *model.ApiError) {
|
||||
r.logger.Info("checkTTLStatusItem query", "tableName", tableName)
|
||||
ttl := new(types.TTLSetting)
|
||||
ttl := new(retentiontypes.TTLSetting)
|
||||
err := r.
|
||||
sqlDB.
|
||||
BunDB().
|
||||
@@ -2388,26 +2372,26 @@ func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string,
|
||||
// getTTLQueryStatus fetches ttl_status table status from DB
|
||||
func (r *ClickHouseReader) getTTLQueryStatus(ctx context.Context, orgID string, tableNameArray []string) (string, *model.ApiError) {
|
||||
failFlag := false
|
||||
status := constants.StatusSuccess
|
||||
status := retentiontypes.TTLSettingStatusSuccess
|
||||
for _, tableName := range tableNameArray {
|
||||
statusItem, apiErr := r.checkTTLStatusItem(ctx, orgID, tableName)
|
||||
emptyStatusStruct := new(types.TTLSetting)
|
||||
emptyStatusStruct := new(retentiontypes.TTLSetting)
|
||||
if statusItem == emptyStatusStruct {
|
||||
return "", nil
|
||||
}
|
||||
if apiErr != nil {
|
||||
return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
|
||||
status = constants.StatusPending
|
||||
if statusItem.Status == retentiontypes.TTLSettingStatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
|
||||
status = retentiontypes.TTLSettingStatusPending
|
||||
return status, nil
|
||||
}
|
||||
if statusItem.Status == constants.StatusFailed {
|
||||
if statusItem.Status == retentiontypes.TTLSettingStatusFailed {
|
||||
failFlag = true
|
||||
}
|
||||
}
|
||||
if failFlag {
|
||||
status = constants.StatusFailed
|
||||
status = retentiontypes.TTLSettingStatusFailed
|
||||
}
|
||||
|
||||
return status, nil
|
||||
@@ -2460,7 +2444,7 @@ func getLocalTableNameArray(tableNames []string) []string {
|
||||
}
|
||||
|
||||
// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces.
|
||||
func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.GetTTLParams) (*retentiontypes.GetTTLResponseItem, *model.ApiError) {
|
||||
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.CodeNamespace: "clickhouse-reader",
|
||||
@@ -2495,8 +2479,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
|
||||
return delTTL, moveTTL
|
||||
}
|
||||
|
||||
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
||||
var dbResp []model.DBResponseTTL
|
||||
getMetricsTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
|
||||
var dbResp []retentiontypes.DBResponseTTL
|
||||
|
||||
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleLocalTableName)
|
||||
|
||||
@@ -2513,8 +2497,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
|
||||
}
|
||||
}
|
||||
|
||||
getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
||||
var dbResp []model.DBResponseTTL
|
||||
getTracesTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
|
||||
var dbResp []retentiontypes.DBResponseTTL
|
||||
|
||||
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.traceLocalTableName, signozTraceDBName)
|
||||
|
||||
@@ -2531,8 +2515,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
|
||||
}
|
||||
}
|
||||
|
||||
getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
||||
var dbResp []model.DBResponseTTL
|
||||
getLogsTTL := func() (*retentiontypes.DBResponseTTL, *model.ApiError) {
|
||||
var dbResp []retentiontypes.DBResponseTTL
|
||||
|
||||
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTableName, r.logsDB)
|
||||
|
||||
@@ -2550,7 +2534,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
|
||||
}
|
||||
|
||||
switch ttlParams.Type {
|
||||
case constants.TraceTTL:
|
||||
case retentiontypes.TraceTTL:
|
||||
tableNameArray := []string{
|
||||
r.TraceDB + "." + r.traceTableName,
|
||||
r.TraceDB + "." + r.traceResourceTableV3,
|
||||
@@ -2578,9 +2562,9 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
|
||||
}
|
||||
|
||||
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
|
||||
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
|
||||
return &retentiontypes.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
|
||||
|
||||
case constants.MetricsTTL:
|
||||
case retentiontypes.MetricsTTL:
|
||||
tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName}
|
||||
tableNameArray = getLocalTableNameArray(tableNameArray)
|
||||
status, apiErr := r.getTTLQueryStatus(ctx, orgID, tableNameArray)
|
||||
@@ -2601,9 +2585,9 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
|
||||
}
|
||||
|
||||
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
|
||||
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
|
||||
return &retentiontypes.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
|
||||
|
||||
case constants.LogsTTL:
|
||||
case retentiontypes.LogsTTL:
|
||||
tableNameArray := []string{r.logsDB + "." + r.logsTableName}
|
||||
tableNameArray = getLocalTableNameArray(tableNameArray)
|
||||
status, apiErr := r.getTTLQueryStatus(ctx, orgID, tableNameArray)
|
||||
@@ -2624,7 +2608,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *
|
||||
}
|
||||
|
||||
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
|
||||
return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
|
||||
return &retentiontypes.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
|
||||
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
|
||||
"github.com/SigNoz/signoz/pkg/signoz"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
@@ -1677,7 +1678,7 @@ func (aH *APIHandler) setCustomRetentionTTL(w http.ResponseWriter, r *http.Reque
|
||||
return
|
||||
}
|
||||
|
||||
var params model.CustomRetentionTTLParams
|
||||
var params retentiontypes.CustomRetentionTTLParams
|
||||
if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil {
|
||||
render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "Invalid data"))
|
||||
return
|
||||
|
||||
@@ -40,6 +40,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
querytemplate "github.com/SigNoz/signoz/pkg/query-service/utils/queryTemplate"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
chVariables "github.com/SigNoz/signoz/pkg/variables/clickhouse"
|
||||
)
|
||||
|
||||
@@ -419,7 +420,7 @@ func parseTime(param string, r *http.Request) (*time.Time, error) {
|
||||
|
||||
}
|
||||
|
||||
func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
func parseTTLParams(r *http.Request) (*retentiontypes.TTLParams, error) {
|
||||
|
||||
// make sure either of the query params are present
|
||||
typeTTL := r.URL.Query().Get("type")
|
||||
@@ -432,7 +433,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
}
|
||||
|
||||
// Validate the type parameter
|
||||
if typeTTL != baseconstants.TraceTTL && typeTTL != baseconstants.MetricsTTL && typeTTL != baseconstants.LogsTTL {
|
||||
if typeTTL != retentiontypes.TraceTTL && typeTTL != retentiontypes.MetricsTTL && typeTTL != retentiontypes.LogsTTL {
|
||||
return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
|
||||
}
|
||||
|
||||
@@ -455,7 +456,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return &model.TTLParams{
|
||||
return &retentiontypes.TTLParams{
|
||||
Type: typeTTL,
|
||||
DelDuration: int64(durationParsed.Seconds()),
|
||||
ColdStorageVolume: coldStorage,
|
||||
@@ -463,7 +464,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
|
||||
func parseGetTTL(r *http.Request) (*retentiontypes.GetTTLParams, error) {
|
||||
|
||||
typeTTL := r.URL.Query().Get("type")
|
||||
|
||||
@@ -471,12 +472,12 @@ func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
|
||||
return nil, fmt.Errorf("type param cannot be empty from the query")
|
||||
} else {
|
||||
// Validate the type parameter
|
||||
if typeTTL != baseconstants.TraceTTL && typeTTL != baseconstants.MetricsTTL && typeTTL != baseconstants.LogsTTL {
|
||||
if typeTTL != retentiontypes.TraceTTL && typeTTL != retentiontypes.MetricsTTL && typeTTL != retentiontypes.LogsTTL {
|
||||
return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
|
||||
}
|
||||
}
|
||||
|
||||
return &model.GetTTLParams{Type: typeTTL}, nil
|
||||
return &retentiontypes.GetTTLParams{Type: typeTTL}, nil
|
||||
}
|
||||
|
||||
func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequest, error) {
|
||||
|
||||
@@ -19,10 +19,6 @@ const (
|
||||
|
||||
const MaxAllowedPointsInTimeSeries = 300
|
||||
|
||||
const TraceTTL = "traces"
|
||||
const MetricsTTL = "metrics"
|
||||
const LogsTTL = "logs"
|
||||
|
||||
const SpanSearchScopeRoot = "isroot"
|
||||
const SpanSearchScopeEntryPoint = "isentrypoint"
|
||||
const OrderBySpanCount = "span_count"
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||
"github.com/SigNoz/signoz/pkg/types/retentiontypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
@@ -23,8 +24,8 @@ type Reader interface {
|
||||
GetServicesList(ctx context.Context) (*[]string, error)
|
||||
GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
|
||||
|
||||
GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
|
||||
GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error)
|
||||
GetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.GetTTLParams) (*retentiontypes.GetTTLResponseItem, *model.ApiError)
|
||||
GetCustomRetentionTTL(ctx context.Context, orgID string) (*retentiontypes.GetCustomRetentionTTLResponse, error)
|
||||
|
||||
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
|
||||
// clickhouse only.
|
||||
@@ -46,8 +47,8 @@ type Reader interface {
|
||||
GetFlamegraphSpansForTrace(ctx context.Context, orgID valuer.UUID, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, error)
|
||||
|
||||
// Setter Interfaces
|
||||
SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||
SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error)
|
||||
SetTTL(ctx context.Context, orgID string, ttlParams *retentiontypes.TTLParams) (*retentiontypes.SetTTLResponseItem, *model.ApiError)
|
||||
SetTTLV2(ctx context.Context, orgID string, params *retentiontypes.CustomRetentionTTLParams) (*retentiontypes.CustomRetentionTTLResponse, error)
|
||||
|
||||
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
|
||||
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
|
||||
|
||||
@@ -404,56 +404,6 @@ type TagKey struct {
|
||||
Type TagDataType `json:"type"`
|
||||
}
|
||||
|
||||
type TTLParams struct {
|
||||
Type string // It can be one of {traces, metrics}.
|
||||
ColdStorageVolume string // Name of the cold storage volume.
|
||||
ToColdStorageDuration int64 // Seconds after which data will be moved to cold storage.
|
||||
DelDuration int64 // Seconds after which data will be deleted.
|
||||
}
|
||||
|
||||
type CustomRetentionTTLParams struct {
|
||||
Type string `json:"type"`
|
||||
DefaultTTLDays int `json:"defaultTTLDays"`
|
||||
TTLConditions []CustomRetentionRule `json:"ttlConditions"`
|
||||
ColdStorageVolume string `json:"coldStorageVolume,omitempty"`
|
||||
ToColdStorageDurationDays int64 `json:"coldStorageDurationDays,omitempty"`
|
||||
}
|
||||
|
||||
type CustomRetentionRule struct {
|
||||
Filters []FilterCondition `json:"conditions"`
|
||||
TTLDays int `json:"ttlDays"`
|
||||
}
|
||||
|
||||
type FilterCondition struct {
|
||||
Key string `json:"key"`
|
||||
Values []string `json:"values"`
|
||||
}
|
||||
|
||||
type GetCustomRetentionTTLResponse struct {
|
||||
Version string `json:"version"`
|
||||
Status string `json:"status"`
|
||||
|
||||
// V1 fields
|
||||
// LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
|
||||
// LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
|
||||
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
|
||||
|
||||
// V2 fields
|
||||
DefaultTTLDays int `json:"default_ttl_days,omitempty"`
|
||||
TTLConditions []CustomRetentionRule `json:"ttl_conditions,omitempty"`
|
||||
ColdStorageVolume string `json:"cold_storage_volume,omitempty"`
|
||||
ColdStorageTTLDays int `json:"cold_storage_ttl_days,omitempty"`
|
||||
}
|
||||
|
||||
type CustomRetentionTTLResponse struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type GetTTLParams struct {
|
||||
Type string
|
||||
}
|
||||
|
||||
type ListErrorsParams struct {
|
||||
StartStr string `json:"start"`
|
||||
EndStr string `json:"end"`
|
||||
|
||||
@@ -150,16 +150,6 @@ type RuleResponseItem struct {
|
||||
Data string `json:"data" db:"data"`
|
||||
}
|
||||
|
||||
type TTLStatusItem struct {
|
||||
Id int `json:"id" db:"id"`
|
||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
TableName string `json:"table_name" db:"table_name"`
|
||||
TTL int `json:"ttl" db:"ttl"`
|
||||
Status string `json:"status" db:"status"`
|
||||
ColdStorageTtl int `json:"cold_storage_ttl" db:"cold_storage_ttl"`
|
||||
}
|
||||
|
||||
type ChannelItem struct {
|
||||
Id int `json:"id" db:"id"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
@@ -462,35 +452,11 @@ type SpanAggregatesDBResponseItem struct {
|
||||
GroupBy string `ch:"groupBy"`
|
||||
}
|
||||
|
||||
type SetTTLResponseItem struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type DiskItem struct {
|
||||
Name string `json:"name,omitempty" ch:"name"`
|
||||
Type string `json:"type,omitempty" ch:"type"`
|
||||
}
|
||||
|
||||
type DBResponseTTL struct {
|
||||
EngineFull string `ch:"engine_full"`
|
||||
}
|
||||
|
||||
type GetTTLResponseItem struct {
|
||||
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
|
||||
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
|
||||
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
|
||||
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
|
||||
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
|
||||
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
|
||||
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
|
||||
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
|
||||
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
type DBResponseServiceName struct {
|
||||
ServiceName string `ch:"serviceName"`
|
||||
Count uint64 `ch:"count"`
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/global"
|
||||
"github.com/SigNoz/signoz/pkg/identn"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
|
||||
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
|
||||
@@ -135,6 +136,9 @@ type Config struct {
|
||||
// Auditor config
|
||||
Auditor auditor.Config `mapstructure:"auditor"`
|
||||
|
||||
// MeterReporter config
|
||||
MeterReporter meterreporter.Config `mapstructure:"meterreporter"`
|
||||
|
||||
// CloudIntegration config
|
||||
CloudIntegration cloudintegration.Config `mapstructure:"cloudintegration"`
|
||||
|
||||
@@ -175,6 +179,7 @@ func NewConfig(ctx context.Context, logger *slog.Logger, resolverConfig config.R
|
||||
identn.NewConfigFactory(),
|
||||
serviceaccount.NewConfigFactory(),
|
||||
auditor.NewConfigFactory(),
|
||||
meterreporter.NewConfigFactory(),
|
||||
cloudintegration.NewConfigFactory(),
|
||||
tracedetail.NewConfigFactory(),
|
||||
authz.NewConfigFactory(),
|
||||
|
||||
@@ -28,6 +28,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/identn/apikeyidentn"
|
||||
"github.com/SigNoz/signoz/pkg/identn/impersonationidentn"
|
||||
"github.com/SigNoz/signoz/pkg/identn/tokenizeridentn"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter/noopmeterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
@@ -318,6 +320,12 @@ func NewAuditorProviderFactories() factory.NamedMap[factory.ProviderFactory[audi
|
||||
)
|
||||
}
|
||||
|
||||
func NewMeterReporterProviderFactories() factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
noopmeterreporter.NewFactory(),
|
||||
)
|
||||
}
|
||||
|
||||
func NewFlaggerProviderFactories(registry featuretypes.Registry) factory.NamedMap[factory.ProviderFactory[flagger.FlaggerProvider, flagger.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
configflagger.NewFactory(registry),
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/identn"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
"github.com/SigNoz/signoz/pkg/licensing"
|
||||
"github.com/SigNoz/signoz/pkg/meterreporter"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
|
||||
"github.com/SigNoz/signoz/pkg/modules/dashboard"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
@@ -84,6 +85,7 @@ type SigNoz struct {
|
||||
Flagger flagger.Flagger
|
||||
Gateway gateway.Gateway
|
||||
Auditor auditor.Auditor
|
||||
MeterReporter meterreporter.Reporter
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -104,6 +106,7 @@ func New(
|
||||
dashboardModuleCallback func(sqlstore.SQLStore, factory.ProviderSettings, analytics.Analytics, organization.Getter, queryparser.QueryParser, querier.Querier, licensing.Licensing) dashboard.Module,
|
||||
gatewayProviderFactory func(licensing.Licensing) factory.ProviderFactory[gateway.Gateway, gateway.Config],
|
||||
auditorProviderFactories func(licensing.Licensing) factory.NamedMap[factory.ProviderFactory[auditor.Auditor, auditor.Config]],
|
||||
meterReporterProviderFactories func(context.Context, flagger.Flagger, licensing.Licensing, telemetrystore.TelemetryStore, sqlstore.SQLStore, organization.Getter, zeus.Zeus) (factory.NamedMap[factory.ProviderFactory[meterreporter.Reporter, meterreporter.Config]], string),
|
||||
querierHandlerCallback func(factory.ProviderSettings, querier.Querier, analytics.Analytics) querier.Handler,
|
||||
cloudIntegrationCallback func(sqlstore.SQLStore, global.Global, zeus.Zeus, gateway.Gateway, licensing.Licensing, serviceaccount.Module, cloudintegration.Config) (cloudintegration.Module, error),
|
||||
rulerProviderFactories func(cache.Cache, alertmanager.Alertmanager, sqlstore.SQLStore, telemetrystore.TelemetryStore, telemetrytypes.MetadataStore, prometheus.Prometheus, organization.Getter, rulestatehistory.Module, querier.Querier, queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]],
|
||||
@@ -386,6 +389,13 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize meter reporter from the variant-specific provider factories
|
||||
meterReporterFactories, meterReporterProvider := meterReporterProviderFactories(ctx, flagger, licensing, telemetrystore, sqlstore, orgGetter, zeus)
|
||||
meterReporter, err := factory.NewProviderFromNamedMap(ctx, providerSettings, config.MeterReporter, meterReporterFactories, meterReporterProvider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize authns
|
||||
store := sqlauthnstore.NewStore(sqlstore)
|
||||
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)
|
||||
@@ -501,6 +511,7 @@ func New(
|
||||
factory.NewNamedService(factory.MustNewName("authz"), authz),
|
||||
factory.NewNamedService(factory.MustNewName("user"), userService, factory.MustNewName("authz")),
|
||||
factory.NewNamedService(factory.MustNewName("auditor"), auditor),
|
||||
factory.NewNamedService(factory.MustNewName("meterreporter"), meterReporter, factory.MustNewName("licensing")),
|
||||
factory.NewNamedService(factory.MustNewName("ruler"), rulerInstance),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -550,5 +561,6 @@ func New(
|
||||
Flagger: flagger,
|
||||
Gateway: gateway,
|
||||
Auditor: auditor,
|
||||
MeterReporter: meterReporter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
13
pkg/types/metercollectortypes/aggregation.go
Normal file
13
pkg/types/metercollectortypes/aggregation.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package metercollectortypes
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
// Aggregation is a supported Zeus aggregation name.
|
||||
type Aggregation struct {
|
||||
valuer.String
|
||||
}
|
||||
|
||||
var (
|
||||
AggregationSum = Aggregation{valuer.NewString("sum")}
|
||||
AggregationMax = Aggregation{valuer.NewString("max")}
|
||||
)
|
||||
40
pkg/types/metercollectortypes/name.go
Normal file
40
pkg/types/metercollectortypes/name.go
Normal file
@@ -0,0 +1,40 @@
|
||||
// Package metercollectortypes holds billing meter value types.
|
||||
package metercollectortypes
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
var nameRegex = regexp.MustCompile(`^[a-z][a-z0-9_.]+$`)
|
||||
|
||||
// Name is a validated dotted meter name.
|
||||
type Name struct {
|
||||
s string
|
||||
}
|
||||
|
||||
func NewName(s string) (Name, error) {
|
||||
if !nameRegex.MatchString(s) {
|
||||
return Name{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid meter name: %s", s)
|
||||
}
|
||||
|
||||
return Name{s: s}, nil
|
||||
}
|
||||
|
||||
func MustNewName(s string) Name {
|
||||
name, err := NewName(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return name
|
||||
}
|
||||
|
||||
func (n Name) String() string {
|
||||
return n.s
|
||||
}
|
||||
|
||||
func (n Name) IsZero() bool {
|
||||
return n.s == ""
|
||||
}
|
||||
13
pkg/types/metercollectortypes/unit.go
Normal file
13
pkg/types/metercollectortypes/unit.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package metercollectortypes
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
// Unit is a supported Zeus meter unit.
|
||||
type Unit struct {
|
||||
valuer.String
|
||||
}
|
||||
|
||||
var (
|
||||
UnitCount = Unit{valuer.NewString("count")}
|
||||
UnitBytes = Unit{valuer.NewString("bytes")}
|
||||
)
|
||||
57
pkg/types/meterreportertypes/meter.go
Normal file
57
pkg/types/meterreportertypes/meter.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package meterreportertypes
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/metercollectortypes"
|
||||
|
||||
// Meter is one meter value sent to Zeus.
|
||||
type Meter struct {
|
||||
// MeterName is the fully-qualified meter identifier.
|
||||
MeterName string `json:"name"`
|
||||
|
||||
// Value is the aggregated scalar for this meter over the reporting window.
|
||||
Value float64 `json:"value"`
|
||||
|
||||
// Unit is the metric unit for this meter.
|
||||
Unit metercollectortypes.Unit `json:"unit"`
|
||||
|
||||
// Aggregation names the aggregation applied to produce Value.
|
||||
Aggregation metercollectortypes.Aggregation `json:"aggregation"`
|
||||
|
||||
// StartUnixMilli is the inclusive window start in epoch milliseconds.
|
||||
StartUnixMilli int64 `json:"start_unix_milli"`
|
||||
|
||||
// EndUnixMilli is the exclusive window end in epoch milliseconds.
|
||||
EndUnixMilli int64 `json:"end_unix_milli"`
|
||||
|
||||
// IsCompleted is false for the current day's partial value.
|
||||
IsCompleted bool `json:"is_completed"`
|
||||
|
||||
// Dimensions is the per-meter label set.
|
||||
Dimensions map[string]string `json:"dimensions"`
|
||||
}
|
||||
|
||||
// NewMeter builds a meter from typed metadata and a reporting window.
|
||||
func NewMeter(
|
||||
name metercollectortypes.Name,
|
||||
value float64,
|
||||
unit metercollectortypes.Unit,
|
||||
aggregation metercollectortypes.Aggregation,
|
||||
window Window,
|
||||
dimensions map[string]string,
|
||||
) Meter {
|
||||
return Meter{
|
||||
MeterName: name.String(),
|
||||
Value: value,
|
||||
Unit: unit,
|
||||
Aggregation: aggregation,
|
||||
StartUnixMilli: window.StartUnixMilli,
|
||||
EndUnixMilli: window.EndUnixMilli,
|
||||
IsCompleted: window.IsCompleted,
|
||||
Dimensions: dimensions,
|
||||
}
|
||||
}
|
||||
|
||||
// PostableMeters is one day of meters for Zeus.PutMetersV3.
|
||||
type PostableMeters struct {
|
||||
// Meters is the set of meter values being shipped for one day.
|
||||
Meters []Meter `json:"meters"`
|
||||
}
|
||||
21
pkg/types/meterreportertypes/window.go
Normal file
21
pkg/types/meterreportertypes/window.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package meterreportertypes
|
||||
|
||||
import "time"
|
||||
|
||||
// Window is the [Start, End) range a reporter tick collects.
|
||||
type Window struct {
|
||||
StartUnixMilli int64
|
||||
EndUnixMilli int64
|
||||
IsCompleted bool
|
||||
}
|
||||
|
||||
// Day returns the UTC day containing the window start.
|
||||
func (w Window) Day() time.Time {
|
||||
t := time.UnixMilli(w.StartUnixMilli).UTC()
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
|
||||
}
|
||||
|
||||
// IsValid rejects empty, zero, and inverted windows.
|
||||
func (w Window) IsValid() bool {
|
||||
return w.StartUnixMilli > 0 && w.EndUnixMilli > w.StartUnixMilli
|
||||
}
|
||||
@@ -73,19 +73,6 @@ func NewTraitsFromOrganization(org *Organization) map[string]any {
|
||||
}
|
||||
}
|
||||
|
||||
type TTLSetting struct {
|
||||
bun.BaseModel `bun:"table:ttl_setting"`
|
||||
Identifiable
|
||||
TimeAuditable
|
||||
TransactionID string `bun:"transaction_id,type:text,notnull"`
|
||||
TableName string `bun:"table_name,type:text,notnull"`
|
||||
TTL int `bun:"ttl,notnull,default:0"`
|
||||
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
|
||||
Status string `bun:"status,type:text,notnull"`
|
||||
OrgID string `json:"-" bun:"org_id,notnull"`
|
||||
Condition string `bun:"condition,type:text"`
|
||||
}
|
||||
|
||||
type OrganizationStore interface {
|
||||
Create(context.Context, *Organization) error
|
||||
Get(context.Context, valuer.UUID) (*Organization, error)
|
||||
|
||||
132
pkg/types/retentiontypes/types.go
Normal file
132
pkg/types/retentiontypes/types.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package retentiontypes
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultLogsRetentionDays = 15
|
||||
DefaultMetricsRetentionDays = 30
|
||||
DefaultTracesRetentionDays = 15
|
||||
)
|
||||
|
||||
const (
|
||||
TraceTTL = "traces"
|
||||
MetricsTTL = "metrics"
|
||||
LogsTTL = "logs"
|
||||
)
|
||||
|
||||
const (
|
||||
TTLSettingStatusPending = "pending"
|
||||
TTLSettingStatusFailed = "failed"
|
||||
TTLSettingStatusSuccess = "success"
|
||||
)
|
||||
|
||||
// Slice is a half-open time range using one TTL recipe.
|
||||
type Slice struct {
|
||||
StartMs int64
|
||||
EndMs int64
|
||||
Rules []CustomRetentionRule
|
||||
DefaultDays int
|
||||
}
|
||||
|
||||
type TTLSetting struct {
|
||||
bun.BaseModel `bun:"table:ttl_setting"`
|
||||
ID valuer.UUID `json:"id" bun:"id,pk,type:text" required:"true"`
|
||||
CreatedAt time.Time `bun:"created_at" json:"createdAt"`
|
||||
UpdatedAt time.Time `bun:"updated_at" json:"updatedAt"`
|
||||
|
||||
TransactionID string `bun:"transaction_id,type:text,notnull"`
|
||||
TableName string `bun:"table_name,type:text,notnull"`
|
||||
TTL int `bun:"ttl,notnull,default:0"`
|
||||
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
|
||||
Status string `bun:"status,type:text,notnull"`
|
||||
OrgID string `json:"-" bun:"org_id,notnull"`
|
||||
Condition string `bun:"condition,type:text"`
|
||||
}
|
||||
|
||||
type TTLParams struct {
|
||||
Type string
|
||||
ColdStorageVolume string
|
||||
ToColdStorageDuration int64
|
||||
DelDuration int64
|
||||
}
|
||||
|
||||
type CustomRetentionTTLParams struct {
|
||||
Type string `json:"type"`
|
||||
DefaultTTLDays int `json:"defaultTTLDays"`
|
||||
TTLConditions []CustomRetentionRule `json:"ttlConditions"`
|
||||
ColdStorageVolume string `json:"coldStorageVolume,omitempty"`
|
||||
ToColdStorageDurationDays int64 `json:"coldStorageDurationDays,omitempty"`
|
||||
}
|
||||
|
||||
type GetTTLParams struct {
|
||||
Type string
|
||||
}
|
||||
|
||||
type GetCustomRetentionTTLResponse struct {
|
||||
Version string `json:"version"`
|
||||
Status string `json:"status"`
|
||||
|
||||
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
|
||||
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
|
||||
|
||||
DefaultTTLDays int `json:"default_ttl_days,omitempty"`
|
||||
TTLConditions []CustomRetentionRule `json:"ttl_conditions,omitempty"`
|
||||
ColdStorageVolume string `json:"cold_storage_volume,omitempty"`
|
||||
ColdStorageTTLDays int `json:"cold_storage_ttl_days,omitempty"`
|
||||
}
|
||||
|
||||
type CustomRetentionTTLResponse struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type TTLStatusItem struct {
|
||||
Id int `json:"id" db:"id"`
|
||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
TableName string `json:"table_name" db:"table_name"`
|
||||
TTL int `json:"ttl" db:"ttl"`
|
||||
Status string `json:"status" db:"status"`
|
||||
ColdStorageTtl int `json:"cold_storage_ttl" db:"cold_storage_ttl"`
|
||||
}
|
||||
|
||||
type SetTTLResponseItem struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type DBResponseTTL struct {
|
||||
EngineFull string `ch:"engine_full"`
|
||||
}
|
||||
|
||||
type GetTTLResponseItem struct {
|
||||
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
|
||||
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
|
||||
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
|
||||
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
|
||||
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
|
||||
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
|
||||
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
|
||||
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
|
||||
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// CustomRetentionRule is one custom retention rule as stored in ttl_setting.condition.
|
||||
// Rules are evaluated in declaration order; the first matching rule wins.
|
||||
type CustomRetentionRule struct {
|
||||
Filters []FilterCondition `json:"conditions"`
|
||||
TTLDays int `json:"ttlDays"`
|
||||
}
|
||||
|
||||
// FilterCondition is one label-key, allowed-values condition inside a retention rule.
|
||||
type FilterCondition struct {
|
||||
Key string `json:"key"`
|
||||
Values []string `json:"values"`
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package zeustypes
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/tidwall/gjson"
|
||||
@@ -37,6 +38,11 @@ type Host struct {
|
||||
URL string `json:"url" required:"true"`
|
||||
}
|
||||
|
||||
type MeterCheckpoint struct {
|
||||
Name string
|
||||
Checkpoint time.Time
|
||||
}
|
||||
|
||||
func NewGettableHost(data []byte) *GettableHost {
|
||||
parsed := gjson.ParseBytes(data)
|
||||
dns := parsed.Get("cluster.region.dns").String()
|
||||
|
||||
@@ -49,6 +49,14 @@ func (provider *provider) PutMetersV2(_ context.Context, _ string, _ []byte) err
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v2 is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) PutMetersV3(_ context.Context, _ string, _ string, _ []byte) error {
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters v3 is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) GetMeterCheckpoints(_ context.Context, _ string) ([]zeustypes.MeterCheckpoint, error) {
|
||||
return nil, errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "fetching meter checkpoints is not supported")
|
||||
}
|
||||
|
||||
func (provider *provider) PutProfile(_ context.Context, _ string, _ *zeustypes.PostableProfile) error {
|
||||
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
|
||||
}
|
||||
|
||||
@@ -35,6 +35,16 @@ type Zeus interface {
|
||||
// Puts the meters for the given license key using Zeus.
|
||||
PutMetersV2(context.Context, string, []byte) error
|
||||
|
||||
// PutMetersV3 ships one day's batch of meter readings to the v2/meters
|
||||
// endpoint. idempotencyKey is propagated as X-Idempotency-Key so Zeus can
|
||||
// UPSERT on retries. The batch is accepted or rejected as a whole.
|
||||
PutMetersV3(ctx context.Context, licenseKey string, idempotencyKey string, body []byte) error
|
||||
|
||||
// GetMeterCheckpoints returns the latest sealed (is_completed=true) UTC day
|
||||
// Zeus has stored for each billing meter name. Missing meter names are
|
||||
// treated by the cron as bootstrap cases.
|
||||
GetMeterCheckpoints(ctx context.Context, licenseKey string) ([]zeustypes.MeterCheckpoint, error)
|
||||
|
||||
// Put profile for the given license key.
|
||||
PutProfile(context.Context, string, *zeustypes.PostableProfile) error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user