Compare commits

...

1 Commits

Author SHA1 Message Date
nityanandagohain
c1be7daf19 fix: integration with opamp llm pricing 2026-04-24 16:52:59 +05:30
13 changed files with 792 additions and 3 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
"github.com/gorilla/handlers"
@@ -111,9 +112,11 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
}
// initiate agent config handler
llmCostFeature := impllmpricingrule.NewLLMCostFeature(signoz.Modules.LLMPricingRule)
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController, llmCostFeature},
})
if err != nil {
return nil, err

View File

@@ -0,0 +1,74 @@
package impllmpricingrule
import (
"context"
"encoding/json"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
const LLMCostFeatureType agentConf.AgentFeatureType = "llm_pricing"
// LLMCostFeature implements agentConf.AgentFeature. It reads pricing rules
// from the module and generates the signozllmpricing processor config for
// deployment to OTel collectors via OpAMP.
type LLMCostFeature struct {
module llmpricingrule.Module
}
func NewLLMCostFeature(module llmpricingrule.Module) *LLMCostFeature {
return &LLMCostFeature{module: module}
}
func (f *LLMCostFeature) AgentFeatureType() agentConf.AgentFeatureType {
return LLMCostFeatureType
}
func (f *LLMCostFeature) RecommendAgentConfig(
orgId valuer.UUID,
currentConfYaml []byte,
configVersion *opamptypes.AgentConfigVersion,
) ([]byte, string, error) {
ctx := context.Background()
rules, err := f.getEnabledRules(ctx, orgId)
if err != nil {
return nil, "", err
}
updatedConf, err := generateCollectorConfigWithLLMCost(currentConfYaml, rules)
if err != nil {
return nil, "", err
}
serialized, err := json.Marshal(rules)
if err != nil {
return nil, "", err
}
return updatedConf, string(serialized), nil
}
// getEnabledRules fetches all enabled pricing rules for the given org.
func (f *LLMCostFeature) getEnabledRules(ctx context.Context, orgId valuer.UUID) ([]*llmpricingruletypes.LLMPricingRule, error) {
if f.module == nil {
return nil, nil
}
rules, _, err := f.module.List(ctx, orgId, 0, 10000)
if err != nil {
return nil, err
}
enabled := make([]*llmpricingruletypes.LLMPricingRule, 0, len(rules))
for _, r := range rules {
if r.Enabled {
enabled = append(enabled, r)
}
}
return enabled, nil
}

View File

@@ -0,0 +1,94 @@
package impllmpricingrule
import (
"bytes"
"fmt"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"gopkg.in/yaml.v3"
)
const processorName = "signozllmpricing"
// buildProcessorConfig converts pricing rules into the signozllmpricing processor config.
func buildProcessorConfig(rules []*llmpricingruletypes.LLMPricingRule) *llmpricingruletypes.LLMPricingRuleProcessorConfig {
pricingRules := make([]llmpricingruletypes.LLMPricingRuleProcessor, 0, len(rules))
for _, r := range rules {
pricingRules = append(pricingRules, llmpricingruletypes.LLMPricingRuleProcessor{
Name: r.Model,
Pattern: r.ModelPattern,
Cache: llmpricingruletypes.LLMPricingRuleProcessorCache{
Mode: r.CacheMode.StringValue(),
Read: r.CostCacheRead,
Write: r.CostCacheWrite,
},
In: r.CostInput,
Out: r.CostOutput,
})
}
return &llmpricingruletypes.LLMPricingRuleProcessorConfig{
Attrs: llmpricingruletypes.LLMPricingRuleProcessorAttrs{
Model: "gen_ai.request.model",
In: "gen_ai.usage.input_tokens",
Out: "gen_ai.usage.output_tokens",
CacheRead: "gen_ai.usage.input_token_details.cached",
CacheWrite: "gen_ai.usage.input_token_details.cache_creation",
},
DefaultPricing: llmpricingruletypes.LLMPricingRuleProcessorDefaultPricing{
Unit: "per_million_tokens",
Rules: pricingRules,
},
OutputAttrs: llmpricingruletypes.LLMPricingRuleProcessorOutputAttrs{
In: "_signoz.gen_ai.cost_input",
Out: "_signoz.gen_ai.cost_output",
CacheRead: "_signoz.gen_ai.cost_cache_read",
CacheWrite: "_signoz.gen_ai.cost_cache_write",
Total: "_signoz.gen_ai.total_cost",
},
}
}
// generateCollectorConfigWithLLMCost injects (or replaces) the signozllmpricing
// processor block in the collector YAML with one built from the given rules.
// Pipeline wiring is handled by the collector's baseline config, not here.
func generateCollectorConfigWithLLMCost(
currentConfYaml []byte,
rules []*llmpricingruletypes.LLMPricingRule,
) ([]byte, error) {
// Empty input: nothing to inject into. Pass through unchanged so we don't
// turn it into "null\n" or fail on yaml.v3's EOF.
if len(bytes.TrimSpace(currentConfYaml)) == 0 {
return currentConfYaml, nil
}
var collectorConf map[string]any
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
return nil, fmt.Errorf("failed to unmarshal collector config: %w", err)
}
if collectorConf == nil {
collectorConf = map[string]any{}
}
processors := map[string]any{}
if collectorConf["processors"] != nil {
if p, ok := collectorConf["processors"].(map[string]any); ok {
processors = p
}
}
procConfig := buildProcessorConfig(rules)
configBytes, err := yaml.Marshal(procConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal llm cost processor config: %w", err)
}
var configMap any
if err := yaml.Unmarshal(configBytes, &configMap); err != nil {
return nil, fmt.Errorf("failed to re-unmarshal llm cost processor config: %w", err)
}
processors[processorName] = configMap
collectorConf["processors"] = processors
return yaml.Marshal(collectorConf)
}

View File

@@ -0,0 +1,169 @@
package impllmpricingrule
import (
"testing"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
func makePricingRule(model string, patterns []string, cacheMode llmpricingruletypes.LLMPricingRuleCacheMode, costIn, costOut, cacheRead, cacheWrite float64) *llmpricingruletypes.LLMPricingRule {
return &llmpricingruletypes.LLMPricingRule{
Model: model,
ModelPattern: patterns,
Unit: llmpricingruletypes.UnitPerMillionTokens,
CacheMode: cacheMode,
CostInput: costIn,
CostOutput: costOut,
CostCacheRead: cacheRead,
CostCacheWrite: cacheWrite,
Enabled: true,
}
}
func TestBuildProcessorConfig_EmptyRules(t *testing.T) {
cfg := buildProcessorConfig(nil)
require.NotNil(t, cfg)
assert.Empty(t, cfg.DefaultPricing.Rules)
assert.Equal(t, "per_million_tokens", cfg.DefaultPricing.Unit)
assert.Equal(t, "gen_ai.request.model", cfg.Attrs.Model)
assert.Equal(t, "gen_ai.usage.input_tokens", cfg.Attrs.In)
assert.Equal(t, "gen_ai.usage.output_tokens", cfg.Attrs.Out)
assert.Equal(t, "gen_ai.usage.input_token_details.cached", cfg.Attrs.CacheRead)
assert.Equal(t, "gen_ai.usage.input_token_details.cache_creation", cfg.Attrs.CacheWrite)
assert.Equal(t, "_signoz.gen_ai.cost_input", cfg.OutputAttrs.In)
assert.Equal(t, "_signoz.gen_ai.cost_output", cfg.OutputAttrs.Out)
assert.Equal(t, "_signoz.gen_ai.cost_cache_read", cfg.OutputAttrs.CacheRead)
assert.Equal(t, "_signoz.gen_ai.cost_cache_write", cfg.OutputAttrs.CacheWrite)
assert.Equal(t, "_signoz.gen_ai.total_cost", cfg.OutputAttrs.Total)
}
func TestBuildProcessorConfig_SingleRule(t *testing.T) {
rules := []*llmpricingruletypes.LLMPricingRule{
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
}
cfg := buildProcessorConfig(rules)
require.Len(t, cfg.DefaultPricing.Rules, 1)
r := cfg.DefaultPricing.Rules[0]
assert.Equal(t, "gpt-4o", r.Name)
assert.Equal(t, []string{"gpt-4o*"}, r.Pattern)
assert.Equal(t, 5.0, r.In)
assert.Equal(t, 15.0, r.Out)
assert.Equal(t, "subtract", r.Cache.Mode)
assert.Equal(t, 2.5, r.Cache.Read)
assert.Equal(t, 0.0, r.Cache.Write)
}
func TestBuildProcessorConfig_MultipleRules_PreservesOrder(t *testing.T) {
rules := []*llmpricingruletypes.LLMPricingRule{
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
makePricingRule("claude-sonnet", []string{"claude-sonnet-*", "claude-3-5-*"}, llmpricingruletypes.LLMPricingRuleCacheModeAdditive, 3.0, 15.0, 0.30, 3.75),
makePricingRule("gemini", []string{"gemini-*"}, llmpricingruletypes.LLMPricingRuleCacheModeUnknown, 1.25, 5.0, 0, 0),
}
cfg := buildProcessorConfig(rules)
require.Len(t, cfg.DefaultPricing.Rules, 3)
assert.Equal(t, "gpt-4o", cfg.DefaultPricing.Rules[0].Name)
assert.Equal(t, []string{"gpt-4o*"}, cfg.DefaultPricing.Rules[0].Pattern)
assert.Equal(t, "subtract", cfg.DefaultPricing.Rules[0].Cache.Mode)
assert.Equal(t, "claude-sonnet", cfg.DefaultPricing.Rules[1].Name)
assert.Equal(t, []string{"claude-sonnet-*", "claude-3-5-*"}, cfg.DefaultPricing.Rules[1].Pattern)
assert.Equal(t, "additive", cfg.DefaultPricing.Rules[1].Cache.Mode)
assert.Equal(t, 0.30, cfg.DefaultPricing.Rules[1].Cache.Read)
assert.Equal(t, 3.75, cfg.DefaultPricing.Rules[1].Cache.Write)
assert.Equal(t, "gemini", cfg.DefaultPricing.Rules[2].Name)
assert.Equal(t, []string{"gemini-*"}, cfg.DefaultPricing.Rules[2].Pattern)
assert.Equal(t, "unknown", cfg.DefaultPricing.Rules[2].Cache.Mode)
assert.Equal(t, 1.25, cfg.DefaultPricing.Rules[2].In)
assert.Equal(t, 5.0, cfg.DefaultPricing.Rules[2].Out)
}
func TestBuildProcessorConfig_NilPattern(t *testing.T) {
rules := []*llmpricingruletypes.LLMPricingRule{
makePricingRule("gpt-4o", nil, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
}
cfg := buildProcessorConfig(rules)
require.Len(t, cfg.DefaultPricing.Rules, 1)
assert.Nil(t, cfg.DefaultPricing.Rules[0].Pattern)
}
func TestGenerateCollectorConfig_NoRulesStillInjectsProcessor(t *testing.T) {
// We deploy the processor even with zero rules so rules can be added
// later (by a user or by Zeus) without any config-shape change.
// Pipeline wiring is handled by the collector's baseline config.
in := []byte(`
receivers:
otlp:
protocols:
grpc:
processors:
batch: {}
exporters:
otlp:
endpoint: localhost:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, signozllmpricing]
exporters: [otlp]
`)
out, err := generateCollectorConfigWithLLMCost(in, nil)
require.NoError(t, err)
var conf map[string]any
require.NoError(t, yaml.Unmarshal(out, &conf))
processors := conf["processors"].(map[string]any)
require.Contains(t, processors, processorName, "processor must be present even with zero rules")
procCfg := processors[processorName].(map[string]any)
pricing := procCfg["default_pricing"].(map[string]any)
if rules, ok := pricing["rules"].([]any); ok {
assert.Empty(t, rules, "rules list must be empty when no pricing rules configured")
}
}
func TestGenerateCollectorConfig_EmptyInput(t *testing.T) {
// yaml.v3 returns an EOF error on empty/whitespace input; ensure the
// generator passes it through unchanged instead.
rules := []*llmpricingruletypes.LLMPricingRule{
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
}
for _, in := range [][]byte{nil, {}, []byte(" \n"), []byte("\t\t")} {
out, err := generateCollectorConfigWithLLMCost(in, rules)
require.NoError(t, err)
assert.Equal(t, in, out)
out, err = generateCollectorConfigWithLLMCost(in, nil)
require.NoError(t, err)
assert.Equal(t, in, out)
}
}
func TestBuildProcessorConfig_ZeroCosts(t *testing.T) {
rules := []*llmpricingruletypes.LLMPricingRule{
makePricingRule("free-model", []string{"free-*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 0, 0, 0, 0),
}
cfg := buildProcessorConfig(rules)
require.Len(t, cfg.DefaultPricing.Rules, 1)
r := cfg.DefaultPricing.Rules[0]
assert.Equal(t, 0.0, r.In)
assert.Equal(t, 0.0, r.Out)
assert.Equal(t, 0.0, r.Cache.Read)
assert.Equal(t, 0.0, r.Cache.Write)
}

View File

@@ -0,0 +1,127 @@
package impllmpricingrule
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type module struct {
store llmpricingruletypes.Store
}
func NewModule(store llmpricingruletypes.Store) llmpricingrule.Module {
return &module{store: store}
}
func (m *module) List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.LLMPricingRule, int, error) {
storables, total, err := m.store.List(ctx, orgID, offset, limit)
if err != nil {
return nil, 0, err
}
rules := make([]*llmpricingruletypes.LLMPricingRule, len(storables))
for i, s := range storables {
rules[i] = llmpricingruletypes.NewLLMPricingRuleFromStorable(s)
}
return rules, total, nil
}
func (m *module) Get(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error) {
s, err := m.store.Get(ctx, orgID, id)
if err != nil {
return nil, err
}
return llmpricingruletypes.NewLLMPricingRuleFromStorable(s), nil
}
// Update applies a batch of pricing rule changes:
// - ID set → match by id, overwrite fields.
// - SourceID set → match by source_id; if found overwrite, else insert.
// - neither set → insert a new user-created row (is_override = true).
//
// When UpdatableLLMPricingRule.IsOverride is nil AND the matched row has
// is_override = true, the row is fully preserved — only synced_at is stamped.
func (m *module) Update(ctx context.Context, orgID valuer.UUID, userEmail string, rules []llmpricingruletypes.UpdatableLLMPricingRule) error {
now := time.Now()
for _, r := range rules {
existing, err := m.findExisting(ctx, orgID, r)
if err != nil {
return err
}
if existing == nil {
if err := m.store.Create(ctx, llmpricingruletypes.NewStorablePricingRuleFromUpdatable(orgID, userEmail, now, r)); err != nil {
return err
}
continue
}
if r.IsOverride == nil && existing.IsOverride {
existing.SyncedAt = &now
if err := m.store.Update(ctx, existing); err != nil {
return err
}
continue
}
applyUpdate(existing, userEmail, now, r)
if err := m.store.Update(ctx, existing); err != nil {
return err
}
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (m *module) Delete(ctx context.Context, orgID, id valuer.UUID) error {
if err := m.store.Delete(ctx, orgID, id); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (m *module) findExisting(ctx context.Context, orgID valuer.UUID, r llmpricingruletypes.UpdatableLLMPricingRule) (*llmpricingruletypes.StorableLLMPricingRule, error) {
switch {
case r.ID != nil:
return m.store.Get(ctx, orgID, *r.ID)
case r.SourceID != nil:
s, err := m.store.GetBySourceID(ctx, orgID, *r.SourceID)
if err != nil {
if errors.Ast(err, errors.TypeNotFound) {
return nil, nil
}
return nil, err
}
return s, nil
default:
return nil, nil
}
}
func applyUpdate(existing *llmpricingruletypes.StorableLLMPricingRule, userEmail string, now time.Time, r llmpricingruletypes.UpdatableLLMPricingRule) {
existing.Model = r.Model
existing.ModelPattern = r.ModelPattern
existing.Unit = r.Unit
existing.CacheMode = r.CacheMode
existing.CostInput = r.CostInput
existing.CostOutput = r.CostOutput
existing.CostCacheRead = r.CostCacheRead
existing.CostCacheWrite = r.CostCacheWrite
if r.IsOverride != nil {
existing.IsOverride = *r.IsOverride
}
existing.Enabled = r.Enabled
existing.SyncedAt = &now
existing.UpdatedAt = now
existing.UpdatedBy = userEmail
}

View File

@@ -0,0 +1,137 @@
package impllmpricingrule
import (
"context"
"database/sql"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) llmpricingruletypes.Store {
return &store{sqlstore: sqlstore}
}
func (s *store) List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.StorableLLMPricingRule, int, error) {
rules := make([]*llmpricingruletypes.StorableLLMPricingRule, 0)
count, err := s.sqlstore.
BunDB().
NewSelect().
Model(&rules).
Where("org_id = ?", orgID).
Order("created_at DESC").
Offset(offset).
Limit(limit).
ScanAndCount(ctx)
if err != nil {
return nil, 0, err
}
return rules, count, nil
}
func (s *store) Get(ctx context.Context, orgID, id valuer.UUID) (*llmpricingruletypes.StorableLLMPricingRule, error) {
rule := new(llmpricingruletypes.StorableLLMPricingRule)
err := s.sqlstore.
BunDB().
NewSelect().
Model(rule).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, s.sqlstore.WrapNotFoundErrf(err, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", id)
}
return nil, err
}
return rule, nil
}
func (s *store) GetBySourceID(ctx context.Context, orgID, sourceID valuer.UUID) (*llmpricingruletypes.StorableLLMPricingRule, error) {
rule := new(llmpricingruletypes.StorableLLMPricingRule)
err := s.sqlstore.
BunDB().
NewSelect().
Model(rule).
Where("org_id = ?", orgID).
Where("source_id = ?", sourceID).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, s.sqlstore.WrapNotFoundErrf(err, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule with source_id %s not found", sourceID)
}
return nil, err
}
return rule, nil
}
func (s *store) Create(ctx context.Context, rule *llmpricingruletypes.StorableLLMPricingRule) error {
_, err := s.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(rule).
Exec(ctx)
if err != nil {
return err
}
return nil
}
func (s *store) Update(ctx context.Context, rule *llmpricingruletypes.StorableLLMPricingRule) error {
res, err := s.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(rule).
Where("org_id = ?", rule.OrgID).
Where("id = ?", rule.ID).
ExcludeColumn("id", "org_id", "created_at", "created_by").
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", rule.ID)
}
return nil
}
func (s *store) Delete(ctx context.Context, orgID, id valuer.UUID) error {
res, err := s.sqlstore.
BunDBCtx(ctx).
NewDelete().
Model((*llmpricingruletypes.StorableLLMPricingRule)(nil)).
Where("org_id = ?", orgID).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", id)
}
return nil
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/gorilla/handlers"
@@ -129,11 +130,14 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
opAmpModel.Init(signoz.SQLStore, signoz.Instrumentation.Logger(), signoz.Modules.OrgGetter)
llmCostFeature := impllmpricingrule.NewLLMCostFeature(signoz.Modules.LLMPricingRule)
agentConfMgr, err := agentConf.Initiate(
&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
llmCostFeature,
},
},
)

View File

@@ -116,6 +116,6 @@ func NewHandlers(
CloudIntegrationHandler: implcloudintegration.NewHandler(modules.CloudIntegration),
AlertmanagerHandler: signozalertmanager.NewHandler(alertmanagerService),
RulerHandler: signozruler.NewHandler(rulerService),
LLMPricingRuleHandler: impllmpricingrule.NewHandler(nil, providerSettings),
LLMPricingRuleHandler: impllmpricingrule.NewHandler(modules.LLMPricingRule, providerSettings),
}
}

View File

@@ -16,6 +16,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring/implinframonitoring"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -76,6 +78,7 @@ type Modules struct {
ServiceAccount serviceaccount.Module
CloudIntegration cloudintegration.Module
RuleStateHistory rulestatehistory.Module
LLMPricingRule llmpricingrule.Module
}
func NewModules(
@@ -127,5 +130,6 @@ func NewModules(
ServiceAccount: serviceAccount,
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
CloudIntegration: cloudIntegrationModule,
LLMPricingRule: impllmpricingrule.NewModule(impllmpricingrule.NewStore(sqlstore)),
}
}

View File

@@ -195,6 +195,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewServiceAccountAuthzactory(sqlstore),
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
sqlmigration.NewAddLLMPricingRulesFactory(sqlstore, sqlschema),
)
}

View File

@@ -0,0 +1,99 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addLLMPricingRules struct {
sqlschema sqlschema.SQLSchema
sqlstore sqlstore.SQLStore
}
func NewAddLLMPricingRulesFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_llm_pricing_rule"), func(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addLLMPricingRules{
sqlschema: sqlschema,
sqlstore: sqlstore,
}, nil
})
}
func (migration *addLLMPricingRules) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addLLMPricingRules) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
sqls := [][]byte{}
tableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
Name: "llm_pricing_rule",
Columns: []*sqlschema.Column{
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "source_id", DataType: sqlschema.DataTypeText, Nullable: true},
{Name: "model", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "model_pattern", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "unit", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "cache_mode", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "cost_input", DataType: sqlschema.DataTypeNumeric, Nullable: false},
{Name: "cost_output", DataType: sqlschema.DataTypeNumeric, Nullable: false},
{Name: "cost_cache_read", DataType: sqlschema.DataTypeNumeric, Nullable: false},
{Name: "cost_cache_write", DataType: sqlschema.DataTypeNumeric, Nullable: false},
{Name: "is_override", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "false"},
{Name: "synced_at", DataType: sqlschema.DataTypeTimestamp, Nullable: true},
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
},
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{
ColumnNames: []sqlschema.ColumnName{"id"},
},
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
{
ReferencingColumnName: sqlschema.ColumnName("org_id"),
ReferencedTableName: sqlschema.TableName("organizations"),
ReferencedColumnName: sqlschema.ColumnName("id"),
},
},
})
sqls = append(sqls, tableSQLs...)
indexSQLs := migration.sqlschema.Operator().CreateIndex(
&sqlschema.UniqueIndex{
TableName: "llm_pricing_rule",
ColumnNames: []sqlschema.ColumnName{"org_id", "source_id"},
})
sqls = append(sqls, indexSQLs...)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
return tx.Commit()
}
func (migration *addLLMPricingRules) Down(context.Context, *bun.DB) error {
return nil
}

View File

@@ -0,0 +1,49 @@
package llmpricingruletypes
// LLMPricingRuleProcessorConfig is the top-level config for the signozllmpricing
// OTel processor that gets deployed to collectors via OpAMP.
type LLMPricingRuleProcessorConfig struct {
Attrs LLMPricingRuleProcessorAttrs `yaml:"attrs" json:"attrs"`
DefaultPricing LLMPricingRuleProcessorDefaultPricing `yaml:"default_pricing" json:"default_pricing"`
OutputAttrs LLMPricingRuleProcessorOutputAttrs `yaml:"output_attrs" json:"output_attrs"`
}
// LLMCostAttrs maps span attribute names to the processor's input fields.
type LLMPricingRuleProcessorAttrs struct {
Model string `yaml:"model" json:"model"`
In string `yaml:"in" json:"in"`
Out string `yaml:"out" json:"out"`
CacheRead string `yaml:"cache_read" json:"cache_read"`
CacheWrite string `yaml:"cache_write" json:"cache_write"`
}
// LLMPricingRuleDefaultPricing holds the pricing unit and the list of model-specific rules.
type LLMPricingRuleProcessorDefaultPricing struct {
Unit string `yaml:"unit" json:"unit"`
Rules []LLMPricingRuleProcessor `yaml:"rules" json:"rules"`
}
// LLMPricingRuleRule is a single pricing rule inside the processor config.
type LLMPricingRuleProcessor struct {
Name string `yaml:"name" json:"name"`
Pattern []string `yaml:"pattern" json:"pattern"`
Cache LLMPricingRuleProcessorCache `yaml:"cache" json:"cache"`
In float64 `yaml:"in" json:"in"`
Out float64 `yaml:"out" json:"out"`
}
// LLMPricingRuleCache describes how cached tokens are accounted for.
type LLMPricingRuleProcessorCache struct {
Mode string `yaml:"mode" json:"mode"`
Read float64 `yaml:"read" json:"read"`
Write float64 `yaml:"write" json:"write"`
}
// LLMPricingRuleOutputAttrs maps the processor's computed cost fields to span attribute names.
type LLMPricingRuleProcessorOutputAttrs struct {
In string `yaml:"in" json:"in"`
Out string `yaml:"out" json:"out"`
CacheRead string `yaml:"cache_read" json:"cache_read"`
CacheWrite string `yaml:"cache_write" json:"cache_write"`
Total string `yaml:"total" json:"total"`
}

View File

@@ -17,7 +17,7 @@ type StringSlice []string
// StorableLLMPricingRule is the bun/DB representation of an LLM pricing rule.
type StorableLLMPricingRule struct {
bun.BaseModel `bun:"table:llm_pricing_rules,alias:llm_pricing_rules"`
bun.BaseModel `bun:"table:llm_pricing_rule,alias:llm_pricing_rule"`
types.Identifiable
types.TimeAuditable
@@ -65,3 +65,31 @@ func (s *StringSlice) Scan(src any) error {
}
return json.Unmarshal(raw, s)
}
func NewStorablePricingRuleFromUpdatable(orgID valuer.UUID, userEmail string, now time.Time, r UpdatableLLMPricingRule) *StorableLLMPricingRule {
isOverride := true
if r.IsOverride != nil {
isOverride = *r.IsOverride
} else if r.SourceID != nil {
isOverride = false
}
return &StorableLLMPricingRule{
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
UserAuditable: types.UserAuditable{CreatedBy: userEmail, UpdatedBy: userEmail},
OrgID: orgID,
SourceID: r.SourceID,
Model: r.Model,
ModelPattern: r.ModelPattern,
Unit: r.Unit,
CacheMode: r.CacheMode,
CostInput: r.CostInput,
CostOutput: r.CostOutput,
CostCacheRead: r.CostCacheRead,
CostCacheWrite: r.CostCacheWrite,
IsOverride: isOverride,
SyncedAt: &now,
Enabled: r.Enabled,
}
}