feat: use flagger package to power feature flags (#9925)

Use the new `flagger` package to power the following features flags in the codebase:
- [x] `use_span_metrics`
- [x] `kafka_span_eval`
- [x] `interpolation_enabled`
This commit is contained in:
Karan Balani
2026-01-07 03:03:34 +05:30
committed by GitHub
parent 09e6342a2a
commit f8b3cac191
18 changed files with 427 additions and 117 deletions

View File

@@ -284,6 +284,9 @@ flagger:
# Config are the overrides for the feature flags which come directly from the config file.
config:
boolean:
use_span_metrics: true
interpolation_enabled: false
kafka_span_eval: false
string:
float:
integer:

View File

@@ -0,0 +1,134 @@
# Flagger
Flagger is SigNoz's feature flagging system built on top of the [OpenFeature](https://openfeature.dev/) standard. It provides a unified interface for evaluating feature flags across the application, allowing features to be enabled, disabled, or configured dynamically without code changes.
> 💡 **Note**: OpenFeature is a CNCF project that provides a vendor-agnostic feature flagging API, making it easy to switch providers without changing application code.
## How does it work?
Flagger consists of three main components:
1. **Registry** (`pkg/flagger/registry.go`) - Contains all available feature flags with their metadata and default values
2. **Flagger** (`pkg/flagger/flagger.go`) - The consumer-facing interface for evaluating feature flags
3. **Providers** (`pkg/flagger/<provider>flagger/`) - Implementations that supply feature flag values (e.g., `configflagger` for config-based flags)
The evaluation flow works as follows:
1. The caller requests a feature flag value via the `Flagger` interface
2. Flagger checks the registry to validate the flag exists and get its default value
3. Each registered provider is queried for an override value
4. If a provider returns a value different from the default, that value is returned
5. Otherwise, the default value from the registry is returned
## How to add a new feature flag?
### 1. Register the flag in the registry
Add your feature flag definition in `pkg/flagger/registry.go`:
```go
var (
// Export the feature name for use in evaluations
FeatureMyNewFeature = featuretypes.MustNewName("my_new_feature")
)
func MustNewRegistry() featuretypes.Registry {
registry, err := featuretypes.NewRegistry(
// ...existing features...
&featuretypes.Feature{
Name: FeatureMyNewFeature,
Kind: featuretypes.KindBoolean, // or KindString, KindFloat, KindInt, KindObject
Stage: featuretypes.StageStable, // or StageAlpha, StageBeta
Description: "Controls whether my new feature is enabled",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
)
// ...
}
```
> 💡 **Note**: Feature names must match the regex `^[a-z_]+$` (lowercase letters and underscores only).
### 2. Configure the feature flag value (optional)
To override the default value, add an entry in your configuration file:
```yaml
flagger:
config:
boolean:
my_new_feature: true
```
Supported configuration types:
| Type | Config Key | Go Type |
|------|------------|---------|
| Boolean | `boolean` | `bool` |
| String | `string` | `string` |
| Float | `float` | `float64` |
| Integer | `integer` | `int64` |
| Object | `object` | `any` |
## How to evaluate a feature flag?
Use the `Flagger` interface to evaluate feature flags. The interface provides typed methods for each value type:
```go
import (
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
)
func DoSomething(ctx context.Context, flagger flagger.Flagger) error {
// Create an evaluation context (typically with org ID)
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
// Evaluate with error handling
enabled, err := flagger.Boolean(ctx, flagger.FeatureMyNewFeature, evalCtx)
if err != nil {
return err
}
if enabled {
// Feature is enabled
}
return nil
}
```
### Empty variants
For cases where you want to use a default value on error (and log the error), use the `*OrEmpty` methods:
```go
func DoSomething(ctx context.Context, flagger flagger.Flagger) {
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
// Returns false on error and logs the error
if flagger.BooleanOrEmpty(ctx, flagger.FeatureMyNewFeature, evalCtx) {
// Feature is enabled
}
}
```
### Available evaluation methods
| Method | Return Type | Empty Variant Default |
|--------|-------------|---------------------|
| `Boolean()` | `(bool, error)` | `false` |
| `String()` | `(string, error)` | `""` |
| `Float()` | `(float64, error)` | `0.0` |
| `Int()` | `(int64, error)` | `0` |
| `Object()` | `(any, error)` | `struct{}{}` |
## What should I remember?
- Always define feature flags in the registry (`pkg/flagger/registry.go`) before using them
- Use descriptive feature names that clearly indicate what the flag controls
- Prefer `*OrEmpty` methods for non-critical features to avoid error handling overhead
- Export feature name variables (e.g., `FeatureMyNewFeature`) for type-safe usage across packages
- Consider the feature's lifecycle stage (`Alpha`, `Beta`, `Stable`) when defining it
- Providers are evaluated in order; the first non-default value wins

View File

@@ -9,9 +9,10 @@ import (
"time"
"github.com/SigNoz/signoz/ee/query-service/constants"
pkgError "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
@@ -25,11 +26,7 @@ func (ah *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(w, pkgError.Newf(pkgError.TypeInvalidInput, pkgError.CodeInvalidInput, "orgId is invalid"))
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
featureSet, err := ah.Signoz.Licensing.GetFeatureFlags(r.Context(), orgID)
if err != nil {
@@ -59,13 +56,16 @@ func (ah *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
}
}
if constants.IsPreferSpanMetrics {
for idx, feature := range featureSet {
if feature.Name == licensetypes.UseSpanMetrics {
featureSet[idx].Active = true
}
}
}
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
useSpanMetrics := ah.Signoz.Flagger.BooleanOrEmpty(ctx, flagger.FeatureUseSpanMetrics, evalCtx)
featureSet = append(featureSet, &licensetypes.Feature{
Name: valuer.NewString(flagger.FeatureUseSpanMetrics.String()),
Active: useSpanMetrics,
Usage: 0,
UsageLimit: -1,
Route: "",
})
if constants.IsDotMetricsEnabled {
for idx, feature := range featureSet {

View File

@@ -23,14 +23,9 @@ func GetOrDefaultEnv(key string, fallback string) string {
const DotMetricsEnabled = "DOT_METRICS_ENABLED"
var IsDotMetricsEnabled = false
var IsPreferSpanMetrics = false
func init() {
if GetOrDefaultEnv(DotMetricsEnabled, "true") == "true" {
IsDotMetricsEnabled = true
}
if GetOrDefaultEnv("USE_SPAN_METRICS", "false") == "true" {
IsPreferSpanMetrics = true
}
}

View File

@@ -18,11 +18,37 @@ type FlaggerProvider interface {
// This is the consumer facing interface for the Flagger service.
type Flagger interface {
Boolean(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (bool, error)
String(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (string, error)
Float(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (float64, error)
Int(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (int64, error)
Object(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (any, error)
// Returns value for the flag of kind boolean otherwise returns error
Boolean(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (bool, error)
// Returns value for the flag of kind string otherwise returns error
String(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (string, error)
// Returns value for the flag of kind float otherwise returns error
Float(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (float64, error)
// Returns value for the flag of kind int otherwise returns error
Int(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (int64, error)
// Returns value for the flag of kind object otherwise returns error
Object(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (any, error)
// Returns value for the flag of kind boolean otherwise returns empty (default) value
BooleanOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) bool
// Returns value for the flag of kind string otherwise returns empty (default) value
StringOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) string
// Returns value for the flag of kind float otherwise returns empty (default) value
FloatOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) float64
// Returns value for the flag of kind int otherwise returns empty (default) value
IntOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) int64
// Returns value for the flag of kind object otherwise returns empty (default) value
ObjectOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) any
// Returns all the features in the registry
List(ctx context.Context, evalCtx featuretypes.FlaggerEvaluationContext) ([]*featuretypes.GettableFeature, error)
}
@@ -65,9 +91,9 @@ func New(ctx context.Context, ps factory.ProviderSettings, config Config, regist
}, nil
}
func (f *flagger) Boolean(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (bool, error) {
func (f *flagger) Boolean(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (bool, error) {
// check if the feature is present in the default registry
feature, _, err := f.registry.GetByString(flag)
feature, _, err := f.registry.Get(flag)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get feature from default registry", "error", err, "flag", flag)
return false, err
@@ -84,7 +110,7 @@ func (f *flagger) Boolean(ctx context.Context, flag string, evalCtx featuretypes
// * this logic can be optimised based on priority of the clients and short circuiting
// now ask all the available clients for the value
for _, client := range f.clients {
value, err := client.BooleanValue(ctx, flag, defaultValue, evalCtx.Ctx())
value, err := client.BooleanValue(ctx, flag.String(), defaultValue, evalCtx.Ctx())
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get value from client", "error", err, "flag", flag, "client", client.Metadata().Name)
continue
@@ -98,9 +124,9 @@ func (f *flagger) Boolean(ctx context.Context, flag string, evalCtx featuretypes
return defaultValue, nil
}
func (f *flagger) String(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (string, error) {
func (f *flagger) String(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (string, error) {
// check if the feature is present in the default registry
feature, _, err := f.registry.GetByString(flag)
feature, _, err := f.registry.Get(flag)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get feature from default registry", "error", err, "flag", flag)
return "", err
@@ -117,7 +143,7 @@ func (f *flagger) String(ctx context.Context, flag string, evalCtx featuretypes.
// * this logic can be optimised based on priority of the clients and short circuiting
// now ask all the available clients for the value
for _, client := range f.clients {
value, err := client.StringValue(ctx, flag, defaultValue, evalCtx.Ctx())
value, err := client.StringValue(ctx, flag.String(), defaultValue, evalCtx.Ctx())
if err != nil {
f.settings.Logger().WarnContext(ctx, "failed to get value from client", "error", err, "flag", flag, "client", client.Metadata().Name)
continue
@@ -131,9 +157,9 @@ func (f *flagger) String(ctx context.Context, flag string, evalCtx featuretypes.
return defaultValue, nil
}
func (f *flagger) Float(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (float64, error) {
func (f *flagger) Float(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (float64, error) {
// check if the feature is present in the default registry
feature, _, err := f.registry.GetByString(flag)
feature, _, err := f.registry.Get(flag)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get feature from default registry", "error", err, "flag", flag)
return 0, err
@@ -150,7 +176,7 @@ func (f *flagger) Float(ctx context.Context, flag string, evalCtx featuretypes.F
// * this logic can be optimised based on priority of the clients and short circuiting
// now ask all the available clients for the value
for _, client := range f.clients {
value, err := client.FloatValue(ctx, flag, defaultValue, evalCtx.Ctx())
value, err := client.FloatValue(ctx, flag.String(), defaultValue, evalCtx.Ctx())
if err != nil {
f.settings.Logger().WarnContext(ctx, "failed to get value from client", "error", err, "flag", flag, "client", client.Metadata().Name)
continue
@@ -164,9 +190,9 @@ func (f *flagger) Float(ctx context.Context, flag string, evalCtx featuretypes.F
return defaultValue, nil
}
func (f *flagger) Int(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (int64, error) {
func (f *flagger) Int(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (int64, error) {
// check if the feature is present in the default registry
feature, _, err := f.registry.GetByString(flag)
feature, _, err := f.registry.Get(flag)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get feature from default registry", "error", err, "flag", flag)
return 0, err
@@ -183,7 +209,7 @@ func (f *flagger) Int(ctx context.Context, flag string, evalCtx featuretypes.Fla
// * this logic can be optimised based on priority of the clients and short circuiting
// now ask all the available clients for the value
for _, client := range f.clients {
value, err := client.IntValue(ctx, flag, defaultValue, evalCtx.Ctx())
value, err := client.IntValue(ctx, flag.String(), defaultValue, evalCtx.Ctx())
if err != nil {
f.settings.Logger().WarnContext(ctx, "failed to get value from client", "error", err, "flag", flag, "client", client.Metadata().Name)
continue
@@ -197,9 +223,9 @@ func (f *flagger) Int(ctx context.Context, flag string, evalCtx featuretypes.Fla
return defaultValue, nil
}
func (f *flagger) Object(ctx context.Context, flag string, evalCtx featuretypes.FlaggerEvaluationContext) (any, error) {
func (f *flagger) Object(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) (any, error) {
// check if the feature is present in the default registry
feature, _, err := f.registry.GetByString(flag)
feature, _, err := f.registry.Get(flag)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get feature from default registry", "error", err, "flag", flag)
return nil, err
@@ -216,7 +242,7 @@ func (f *flagger) Object(ctx context.Context, flag string, evalCtx featuretypes.
// * this logic can be optimised based on priority of the clients and short circuiting
// now ask all the available clients for the value
for _, client := range f.clients {
value, err := client.ObjectValue(ctx, flag, defaultValue, evalCtx.Ctx())
value, err := client.ObjectValue(ctx, flag.String(), defaultValue, evalCtx.Ctx())
if err != nil {
f.settings.Logger().WarnContext(ctx, "failed to get value from client", "error", err, "flag", flag, "client", client.Metadata().Name)
continue
@@ -232,6 +258,56 @@ func (f *flagger) Object(ctx context.Context, flag string, evalCtx featuretypes.
return defaultValue, nil
}
func (f *flagger) BooleanOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) bool {
defaultValue := false
value, err := f.Boolean(ctx, flag, evalCtx)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get value from flagger service", "error", err, "flag", flag)
return defaultValue
}
return value
}
func (f *flagger) StringOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) string {
defaultValue := ""
value, err := f.String(ctx, flag, evalCtx)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get value from flagger service", "error", err, "flag", flag)
return defaultValue
}
return value
}
func (f *flagger) FloatOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) float64 {
defaultValue := 0.0
value, err := f.Float(ctx, flag, evalCtx)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get value from flagger service", "error", err, "flag", flag)
return defaultValue
}
return value
}
func (f *flagger) IntOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) int64 {
defaultValue := int64(0)
value, err := f.Int(ctx, flag, evalCtx)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get value from flagger service", "error", err, "flag", flag)
return defaultValue
}
return value
}
func (f *flagger) ObjectOrEmpty(ctx context.Context, flag featuretypes.Name, evalCtx featuretypes.FlaggerEvaluationContext) any {
defaultValue := struct{}{}
value, err := f.Object(ctx, flag, evalCtx)
if err != nil {
f.settings.Logger().ErrorContext(ctx, "failed to get value from flagger service", "error", err, "flag", flag)
return defaultValue
}
return value
}
func (f *flagger) List(ctx context.Context, evalCtx featuretypes.FlaggerEvaluationContext) ([]*featuretypes.GettableFeature, error) {
// get all the feature from the default registry
allFeatures := f.registry.List()

View File

@@ -2,8 +2,39 @@ package flagger
import "github.com/SigNoz/signoz/pkg/types/featuretypes"
var (
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureInterpolationEnabled = featuretypes.MustNewName("interpolation_enabled")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
)
func MustNewRegistry() featuretypes.Registry {
registry, err := featuretypes.NewRegistry()
registry, err := featuretypes.NewRegistry(
&featuretypes.Feature{
Name: FeatureUseSpanMetrics,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageStable,
Description: "Controls whether to use span metrics",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureInterpolationEnabled,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether to enable interpolation",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureKafkaSpanEval,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether to enable kafka span eval",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
)
if err != nil {
panic(err)
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querybuilder"
@@ -22,6 +23,7 @@ func NewFactory(
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
cache cache.Cache,
flagger flagger.Flagger,
) factory.ProviderFactory[querier.Querier, querier.Config] {
return factory.NewProviderFactory(
factory.MustNewName("signoz"),
@@ -30,7 +32,7 @@ func NewFactory(
settings factory.ProviderSettings,
cfg querier.Config,
) (querier.Querier, error) {
return newProvider(ctx, settings, cfg, telemetryStore, prometheus, cache)
return newProvider(ctx, settings, cfg, telemetryStore, prometheus, cache, flagger)
},
)
}
@@ -42,6 +44,7 @@ func newProvider(
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
cache cache.Cache,
flagger flagger.Flagger,
) (querier.Querier, error) {
// Create telemetry metadata store
@@ -140,6 +143,7 @@ func newProvider(
telemetryMetadataStore,
metricFieldMapper,
metricConditionBuilder,
flagger,
)
// Create meter statement builder
@@ -148,6 +152,7 @@ func newProvider(
telemetryMetadataStore,
metricFieldMapper,
metricConditionBuilder,
metricStmtBuilder,
)
// Create bucket cache

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/queryparser"
@@ -63,6 +64,7 @@ import (
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
@@ -2007,13 +2009,25 @@ func (aH *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
return
}
if constants.PreferSpanMetrics {
for idx, feature := range featureSet {
if feature.Name == licensetypes.UseSpanMetrics {
featureSet[idx].Active = true
}
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
aH.HandleError(w, err, http.StatusInternalServerError)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
useSpanMetrics := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureUseSpanMetrics, evalCtx)
featureSet = append(featureSet, &licensetypes.Feature{
Name: valuer.NewString(flagger.FeatureUseSpanMetrics.String()),
Active: useSpanMetrics,
Usage: 0,
UsageLimit: -1,
Route: "",
})
if constants.IsDotMetricsEnabled {
for idx, feature := range featureSet {
if feature.Name == licensetypes.DotMetricsEnabled {
@@ -2630,7 +2644,10 @@ func (aH *APIHandler) getProducerData(w http.ResponseWriter, r *http.Request) {
return
}
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer")
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
kafkaSpanEval := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureKafkaSpanEval, evalCtx)
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@@ -2680,7 +2697,10 @@ func (aH *APIHandler) getConsumerData(w http.ResponseWriter, r *http.Request) {
return
}
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer")
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
kafkaSpanEval := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureKafkaSpanEval, evalCtx)
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@@ -2731,7 +2751,10 @@ func (aH *APIHandler) getPartitionOverviewLatencyData(w http.ResponseWriter, r *
return
}
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput")
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
kafkaSpanEval := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureKafkaSpanEval, evalCtx)
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@@ -2782,7 +2805,10 @@ func (aH *APIHandler) getConsumerPartitionLatencyData(w http.ResponseWriter, r *
return
}
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency")
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
kafkaSpanEval := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureKafkaSpanEval, evalCtx)
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@@ -2947,7 +2973,10 @@ func (aH *APIHandler) getProducerThroughputDetails(w http.ResponseWriter, r *htt
return
}
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-throughput-details")
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
kafkaSpanEval := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureKafkaSpanEval, evalCtx)
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-throughput-details", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@@ -2998,7 +3027,10 @@ func (aH *APIHandler) getConsumerThroughputOverview(w http.ResponseWriter, r *ht
return
}
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview")
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
kafkaSpanEval := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureKafkaSpanEval, evalCtx)
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@@ -3049,7 +3081,10 @@ func (aH *APIHandler) getConsumerThroughputDetails(w http.ResponseWriter, r *htt
return
}
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details")
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
kafkaSpanEval := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureKafkaSpanEval, evalCtx)
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@@ -3103,7 +3138,10 @@ func (aH *APIHandler) getProducerConsumerEval(w http.ResponseWriter, r *http.Req
return
}
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval")
evalCtx := featuretypes.NewFlaggerEvaluationContext(orgID)
kafkaSpanEval := aH.Signoz.Flagger.BooleanOrEmpty(r.Context(), flagger.FeatureKafkaSpanEval, evalCtx)
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)

View File

@@ -2,6 +2,7 @@ package kafka
import (
"fmt"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
@@ -9,9 +10,9 @@ import (
var defaultStepInterval int64 = 60
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) {
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string, kafkaSpanEval bool) (*v3.QueryRangeParamsV3, error) {
if constants.KafkaSpanEval == "false" && queryContext == "producer-consumer-eval" {
if !kafkaSpanEval && queryContext == "producer-consumer-eval" {
return nil, fmt.Errorf("span evaluation feature is disabled and is experimental")
}

View File

@@ -40,8 +40,6 @@ const NormalizedMetricsMapQueryThreads = 10
var NormalizedMetricsMapRegex = regexp.MustCompile(`[^a-zA-Z0-9]`)
var NormalizedMetricsMapQuantileRegex = regexp.MustCompile(`(?i)([._-]?quantile.*)$`)
var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false")
func GetEvalDelay() time.Duration {
evalDelayStr := GetOrDefaultEnv("RULES_EVAL_DELAY", "2m")
evalDelayDuration, err := time.ParseDuration(evalDelayStr)
@@ -680,7 +678,6 @@ var OldToNewTraceFieldsMap = map[string]string{
var StaticFieldsTraces = map[string]v3.AttributeKey{}
var IsDotMetricsEnabled = false
var PreferSpanMetrics = false
var MaxJSONFlatteningDepth = 1
func init() {
@@ -689,9 +686,6 @@ func init() {
if GetOrDefaultEnv(DotMetricsEnabled, "true") == "true" {
IsDotMetricsEnabled = true
}
if GetOrDefaultEnv("USE_SPAN_METRICS", "false") == "true" {
PreferSpanMetrics = true
}
// set max flattening depth
depth, err := strconv.Atoi(GetOrDefaultEnv(maxJSONFlatteningDepth, "1"))

View File

@@ -143,7 +143,7 @@ func (df *DeprecatedFlags) RegisterFlags(cmd *cobra.Command) {
_ = cmd.Flags().MarkDeprecated("flux-interval", "use SIGNOZ_QUERIER_FLUX__INTERVAL instead")
_ = cmd.Flags().MarkDeprecated("flux-interval-for-trace-detail", "use SIGNOZ_QUERIER_FLUX__INTERVAL instead")
_ = cmd.Flags().MarkDeprecated("cluster", "use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER instead")
_ = cmd.Flags().MarkDeprecated("prefer-span-metrics", "use USE_SPAN_METRICS instead")
_ = cmd.Flags().MarkDeprecated("prefer-span-metrics", "use SIGNOZ_FLAGGER_CONFIG_BOOLEAN_USE__SPAN__METRICS instead")
_ = cmd.Flags().MarkDeprecated("gateway-url", "use SIGNOZ_GATEWAY_URL instead")
}
@@ -332,7 +332,19 @@ func mergeAndEnsureBackwardCompatibility(ctx context.Context, logger *slog.Logge
}
if deprecatedFlags.PreferSpanMetrics {
logger.WarnContext(ctx, "[Deprecated] flag --prefer-span-metrics is deprecated and scheduled for removal. Please use USE_SPAN_METRICS instead.")
logger.WarnContext(ctx, "[Deprecated] flag --prefer-span-metrics is deprecated and scheduled for removal. Please use SIGNOZ_FLAGGER_CONFIG_BOOLEAN_USE__SPAN__METRICS instead.")
if config.Flagger.Config.Boolean == nil {
config.Flagger.Config.Boolean = make(map[string]bool)
}
config.Flagger.Config.Boolean[flagger.FeatureUseSpanMetrics.String()] = deprecatedFlags.PreferSpanMetrics
}
if os.Getenv("USE_SPAN_METRICS") != "" {
logger.WarnContext(ctx, "[Deprecated] env USE_SPAN_METRICS is deprecated and scheduled for removal. Please use SIGNOZ_FLAGGER_CONFIG_BOOLEAN_USE__SPAN__METRICS instead.")
if config.Flagger.Config.Boolean == nil {
config.Flagger.Config.Boolean = make(map[string]bool)
}
config.Flagger.Config.Boolean[flagger.FeatureUseSpanMetrics.String()] = os.Getenv("USE_SPAN_METRICS") == "true"
}
if deprecatedFlags.GatewayUrl != "" {
@@ -349,6 +361,22 @@ func mergeAndEnsureBackwardCompatibility(ctx context.Context, logger *slog.Logge
logger.WarnContext(ctx, "[Deprecated] env SIGNOZ_JWT_SECRET is deprecated and scheduled for removal. Please use SIGNOZ_TOKENIZER_JWT_SECRET instead.")
config.Tokenizer.JWT.Secret = os.Getenv("SIGNOZ_JWT_SECRET")
}
if os.Getenv("KAFKA_SPAN_EVAL") != "" {
logger.WarnContext(ctx, "[Deprecated] env KAFKA_SPAN_EVAL is deprecated and scheduled for removal. Please use SIGNOZ_FLAGGER_CONFIG_BOOLEAN_KAFKA__SPAN__EVAL instead.")
if config.Flagger.Config.Boolean == nil {
config.Flagger.Config.Boolean = make(map[string]bool)
}
config.Flagger.Config.Boolean[flagger.FeatureKafkaSpanEval.String()] = os.Getenv("KAFKA_SPAN_EVAL") == "true"
}
if os.Getenv("INTERPOLATION_ENABLED") != "" {
logger.WarnContext(ctx, "[Deprecated] env INTERPOLATION_ENABLED is deprecated and scheduled for removal. Please use SIGNOZ_FLAGGER_CONFIG_BOOLEAN_INTERPOLATION__ENABLED instead.")
if config.Flagger.Config.Boolean == nil {
config.Flagger.Config.Boolean = make(map[string]bool)
}
config.Flagger.Config.Boolean[flagger.FeatureInterpolationEnabled.String()] = os.Getenv("INTERPOLATION_ENABLED") == "true"
}
}
func (config Config) Collect(_ context.Context, _ valuer.UUID) (map[string]any, error) {

View File

@@ -224,9 +224,9 @@ func NewStatsReporterProviderFactories(telemetryStore telemetrystore.TelemetrySt
)
}
func NewQuerierProviderFactories(telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, cache cache.Cache) factory.NamedMap[factory.ProviderFactory[querier.Querier, querier.Config]] {
func NewQuerierProviderFactories(telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, cache cache.Cache, flagger flagger.Flagger) factory.NamedMap[factory.ProviderFactory[querier.Querier, querier.Config]] {
return factory.MustNewNamedMap(
signozquerier.NewFactory(telemetryStore, prometheus, cache),
signozquerier.NewFactory(telemetryStore, prometheus, cache, flagger),
)
}

View File

@@ -149,6 +149,20 @@ func New(
return nil, err
}
// Initialize flagger from the available flagger provider factories
flaggerRegistry := flagger.MustNewRegistry()
flaggerProviderFactories := NewFlaggerProviderFactories(flaggerRegistry)
flagger, err := flagger.New(
ctx,
providerSettings,
config.Flagger,
flaggerRegistry,
flaggerProviderFactories.GetInOrder()...,
)
if err != nil {
return nil, err
}
// Initialize web from the available web provider factories
web, err := factory.NewProviderFromNamedMap(
ctx,
@@ -202,7 +216,7 @@ func New(
ctx,
providerSettings,
config.Querier,
NewQuerierProviderFactories(telemetrystore, prometheus, cache),
NewQuerierProviderFactories(telemetrystore, prometheus, cache, flagger),
config.Querier.Provider(),
)
if err != nil {
@@ -362,20 +376,6 @@ func New(
return nil, err
}
// Initialize flagger from the available flagger provider factories
flaggerRegistry := flagger.MustNewRegistry()
flaggerProviderFactories := NewFlaggerProviderFactories(flaggerRegistry)
flagger, err := flagger.New(
ctx,
providerSettings,
config.Flagger,
flaggerRegistry,
flaggerProviderFactories.GetInOrder()...,
)
if err != nil {
return nil, err
}
// Initialize all modules
roleModule := implrole.NewModule(implrole.NewStore(sqlstore), authz, nil)
dashboardModule := dashboardModuleCallback(sqlstore, providerSettings, analytics, orgGetter, roleModule, queryParser, querier, licensing)

View File

@@ -29,9 +29,9 @@ func NewMeterQueryStatementBuilder(
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
metricsStatementBuilder *telemetrymetrics.MetricQueryStatementBuilder,
) *meterQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymeter")
metricsStatementBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(settings, metadataStore, fieldMapper, conditionBuilder)
return &meterQueryStatementBuilder{
logger: metricsSettings.Logger(),

View File

@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
@@ -165,11 +166,19 @@ func TestStatementBuilder(t *testing.T) {
}
mockMetadataStore.KeysMap = keys
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
if err != nil {
t.Fatalf("failed to create flagger: %v", err)
}
metricStmtBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(instrumentationtest.New().ToProviderSettings(), mockMetadataStore, fm, cb, flagger)
statementBuilder := NewMeterQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
metricStmtBuilder,
)
for _, c := range cases {

View File

@@ -4,13 +4,15 @@ import (
"context"
"fmt"
"log/slog"
"os"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
"golang.org/x/exp/slices"
)
@@ -67,6 +69,7 @@ type MetricQueryStatementBuilder struct {
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
flagger flagger.Flagger
}
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*MetricQueryStatementBuilder)(nil)
@@ -76,6 +79,7 @@ func NewMetricQueryStatementBuilder(
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
flagger flagger.Flagger,
) *MetricQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetrics")
return &MetricQueryStatementBuilder{
@@ -83,6 +87,7 @@ func NewMetricQueryStatementBuilder(
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
flagger: flagger,
}
}
@@ -450,7 +455,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
}
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
_ context.Context,
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
@@ -485,10 +490,13 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
// ! TODO (balanikaran) Get OrgID via function parameter instead of valuer.GenerateUUID()
interpolationEnabled := b.flagger.BooleanOrEmpty(ctx, flagger.FeatureInterpolationEnabled, featuretypes.NewFlaggerEvaluationContext(valuer.GenerateUUID()))
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(RateWithoutNegative, start, start)
if os.Getenv("INTERPOLATION_ENABLED") == "true" {
if interpolationEnabled {
rateExpr = RateWithInterpolation
}
wrapped := sqlbuilder.NewSelectBuilder()
@@ -503,7 +511,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
case metrictypes.TimeAggregationIncrease:
incExpr := fmt.Sprintf(IncreaseWithoutNegative, start, start)
if os.Getenv("INTERPOLATION_ENABLED") == "true" {
if interpolationEnabled {
incExpr = IncreaseWithInterpolation
}
wrapped := sqlbuilder.NewSelectBuilder()

View File

@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -232,11 +233,17 @@ func TestStatementBuilder(t *testing.T) {
}
}
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
if err != nil {
t.Fatalf("failed to create flagger: %v", err)
}
statementBuilder := NewMetricQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
flagger,
)
for _, c := range cases {

View File

@@ -1,6 +1,8 @@
package licensetypes
import "github.com/SigNoz/signoz/pkg/valuer"
import (
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
// Feature Key
@@ -9,7 +11,7 @@ var (
ChatSupport = valuer.NewString("chat_support")
Gateway = valuer.NewString("gateway")
PremiumSupport = valuer.NewString("premium_support")
UseSpanMetrics = valuer.NewString("use_span_metrics")
AnomalyDetection = valuer.NewString("anomaly_detection")
DotMetricsEnabled = valuer.NewString("dot_metrics_enabled")
@@ -37,13 +39,6 @@ var BasicPlan = []*Feature{
UsageLimit: -1,
Route: "",
},
{
Name: UseSpanMetrics,
Active: false,
Usage: 0,
UsageLimit: -1,
Route: "",
},
{
Name: Gateway,
Active: false,
@@ -82,13 +77,6 @@ var EnterprisePlan = []*Feature{
UsageLimit: -1,
Route: "",
},
{
Name: UseSpanMetrics,
Active: false,
Usage: 0,
UsageLimit: -1,
Route: "",
},
{
Name: Onboarding,
Active: true,
@@ -134,13 +122,6 @@ var EnterprisePlan = []*Feature{
}
var DefaultFeatureSet = []*Feature{
{
Name: UseSpanMetrics,
Active: false,
Usage: 0,
UsageLimit: -1,
Route: "",
},
{
Name: DotMetricsEnabled,
Active: false,