Compare commits

...

2 Commits

Author SHA1 Message Date
nityanandagohain
95d100aedf fix: lint 2026-04-30 18:33:21 +05:30
nityanandagohain
2a858adae7 fix: add changes 2026-04-30 18:11:27 +05:30
17 changed files with 806 additions and 9 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
"github.com/gorilla/handlers"
@@ -112,9 +113,11 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
}
// initiate agent config handler
llmCostFeature := impllmpricingrule.NewLLMCostFeature(signoz.Modules.LLMPricingRule)
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController, llmCostFeature},
})
if err != nil {
return nil, err

View File

@@ -0,0 +1,74 @@
package impllmpricingrule
import (
"context"
"encoding/json"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
const LLMCostFeatureType agentConf.AgentFeatureType = "llm_pricing"
// LLMCostFeature implements agentConf.AgentFeature. It reads pricing rules
// from the module and generates the signozllmpricing processor config for
// deployment to OTel collectors via OpAMP.
type LLMCostFeature struct {
module llmpricingrule.Module
}
func NewLLMCostFeature(module llmpricingrule.Module) *LLMCostFeature {
return &LLMCostFeature{module: module}
}
func (f *LLMCostFeature) AgentFeatureType() agentConf.AgentFeatureType {
return LLMCostFeatureType
}
func (f *LLMCostFeature) RecommendAgentConfig(
orgId valuer.UUID,
currentConfYaml []byte,
configVersion *opamptypes.AgentConfigVersion,
) ([]byte, string, error) {
ctx := context.Background()
rules, err := f.getEnabledRules(ctx, orgId)
if err != nil {
return nil, "", err
}
updatedConf, err := generateCollectorConfigWithLLMPricingProcessor(currentConfYaml, rules)
if err != nil {
return nil, "", err
}
serialized, err := json.Marshal(rules)
if err != nil {
return nil, "", err
}
return updatedConf, string(serialized), nil
}
// getEnabledRules fetches all enabled pricing rules for the given org.
func (f *LLMCostFeature) getEnabledRules(ctx context.Context, orgId valuer.UUID) ([]*llmpricingruletypes.LLMPricingRule, error) {
if f.module == nil {
return nil, nil
}
rules, _, err := f.module.List(ctx, orgId, 0, 10000)
if err != nil {
return nil, err
}
enabled := make([]*llmpricingruletypes.LLMPricingRule, 0, len(rules))
for _, r := range rules {
if r.Enabled {
enabled = append(enabled, r)
}
}
return enabled, nil
}

View File

@@ -0,0 +1,95 @@
package impllmpricingrule
import (
"bytes"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"gopkg.in/yaml.v3"
)
const processorName = "signozllmpricing"
// buildProcessorConfig converts pricing rules into the signozllmpricing processor config.
func buildProcessorConfig(rules []*llmpricingruletypes.LLMPricingRule) *llmpricingruletypes.LLMPricingRuleProcessorConfig {
pricingRules := make([]llmpricingruletypes.LLMPricingRuleProcessor, 0, len(rules))
for _, r := range rules {
var cache llmpricingruletypes.LLMPricingRuleProcessorCache
if r.Pricing.Cache != nil {
cache = llmpricingruletypes.LLMPricingRuleProcessorCache{
Mode: r.Pricing.Cache.Mode.StringValue(),
Read: r.Pricing.Cache.Read,
Write: r.Pricing.Cache.Write,
}
}
pricingRules = append(pricingRules, llmpricingruletypes.LLMPricingRuleProcessor{
Name: r.Model,
Pattern: r.ModelPattern,
Cache: cache,
In: r.Pricing.Input,
Out: r.Pricing.Output,
})
}
return &llmpricingruletypes.LLMPricingRuleProcessorConfig{
Attrs: llmpricingruletypes.LLMPricingRuleProcessorAttrs{
Model: "gen_ai.request.model",
In: "gen_ai.usage.input_tokens",
Out: "gen_ai.usage.output_tokens",
CacheRead: "gen_ai.usage.cache_read.input_tokens",
CacheWrite: "gen_ai.usage.cache_creation.input_tokens",
},
DefaultPricing: llmpricingruletypes.LLMPricingRuleProcessorDefaultPricing{
Unit: "per_million_tokens",
Rules: pricingRules,
},
OutputAttrs: llmpricingruletypes.LLMPricingRuleProcessorOutputAttrs{
In: "_signoz.gen_ai.cost_input",
Out: "_signoz.gen_ai.cost_output",
CacheRead: "_signoz.gen_ai.cost_cache_read",
CacheWrite: "_signoz.gen_ai.cost_cache_write",
Total: "_signoz.gen_ai.total_cost",
},
}
}
// generateCollectorConfigWithLLMPricingProcessor injects (or replaces) the signozllmpricing
// processor block in the collector YAML with one built from the given rules.
// Pipeline wiring is handled by the collector's baseline config, not here.
func generateCollectorConfigWithLLMPricingProcessor(
currentConfYaml []byte,
rules []*llmpricingruletypes.LLMPricingRule,
) ([]byte, error) {
// Empty input: nothing to inject into. Pass through unchanged so we don't
// turn it into "null\n" or fail on yaml.v3's EOF.
if len(bytes.TrimSpace(currentConfYaml)) == 0 {
return currentConfYaml, nil
}
var collectorConf map[string]any
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
return nil, errors.Wrapf(err, errors.TypeInvalidInput, llmpricingruletypes.ErrCodeInvalidCollectorConfig, "failed to unmarshal collector config")
}
// rare but don't do anything in this case, also means it's just comments
if collectorConf == nil {
return currentConfYaml, nil
}
processors := map[string]any{}
if existing, ok := collectorConf["processors"]; ok && existing != nil {
p, ok := existing.(map[string]any)
if !ok {
return nil, errors.Newf(errors.TypeInvalidInput, llmpricingruletypes.ErrCodeInvalidCollectorConfig, "collector config 'processors' must be a mapping, got %T", existing)
}
processors = p
}
processors[processorName] = buildProcessorConfig(rules)
collectorConf["processors"] = processors
out, err := yaml.Marshal(collectorConf)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, llmpricingruletypes.ErrCodeBuildPricingProcessorConf, "failed to marshal llm pricing processor config")
}
return out, nil
}

View File

@@ -0,0 +1,92 @@
package impllmpricingrule
import (
"os"
"path/filepath"
"testing"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
// assertYAMLEqualToFile decodes both sides into any and compares structurally,
// so map key ordering is irrelevant.
func assertYAMLEqualToFile(t *testing.T, name string, actual []byte) {
t.Helper()
expected, err := os.ReadFile(filepath.Join("testdata", name))
require.NoError(t, err)
var e, a any
require.NoError(t, yaml.Unmarshal(expected, &e))
require.NoError(t, yaml.Unmarshal(actual, &a))
assert.Equal(t, e, a)
}
func makePricingRule(model string, patterns []string, cacheMode llmpricingruletypes.LLMPricingRuleCacheMode, costIn, costOut, cacheRead, cacheWrite float64) *llmpricingruletypes.LLMPricingRule {
return &llmpricingruletypes.LLMPricingRule{
Model: model,
ModelPattern: llmpricingruletypes.StringSlice(patterns),
Unit: llmpricingruletypes.UnitPerMillionTokens,
Pricing: llmpricingruletypes.LLMRulePricing{
Input: costIn,
Output: costOut,
Cache: &llmpricingruletypes.LLMPricingCacheCosts{
Mode: cacheMode,
Read: cacheRead,
Write: cacheWrite,
},
},
Enabled: true,
}
}
func TestGenerateCollectorConfigWithLLMPricingProcessor(t *testing.T) {
tests := []struct {
name string
rules []*llmpricingruletypes.LLMPricingRule
expectedFile string
}{
{
name: "with_rule",
rules: []*llmpricingruletypes.LLMPricingRule{
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
},
expectedFile: "collector_with_rule.yaml",
},
// We deploy the processor even with zero rules so rules can be added
// later (by a user or by Zeus) without any config-shape change.
// Pipeline wiring is handled by the collector's baseline config.
{
name: "no_rules",
rules: nil,
expectedFile: "collector_no_rules.yaml",
},
}
input, err := os.ReadFile(filepath.Join("testdata", "collector_baseline.yaml"))
require.NoError(t, err)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
out, err := generateCollectorConfigWithLLMPricingProcessor(input, tc.rules)
require.NoError(t, err)
assertYAMLEqualToFile(t, tc.expectedFile, out)
})
}
}
func TestGenerateCollectorConfig_EmptyInputPassthrough(t *testing.T) {
// yaml.v3 errors on empty/whitespace input; the generator passes such
// input through unchanged instead.
rules := []*llmpricingruletypes.LLMPricingRule{
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
}
for _, in := range [][]byte{nil, []byte(" \n")} {
out, err := generateCollectorConfigWithLLMPricingProcessor(in, rules)
require.NoError(t, err)
assert.Equal(t, in, out)
}
}

View File

@@ -0,0 +1,90 @@
package impllmpricingrule
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type module struct {
store llmpricingruletypes.Store
}
func NewModule(store llmpricingruletypes.Store) llmpricingrule.Module {
return &module{store: store}
}
func (module *module) List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.LLMPricingRule, int, error) {
return module.store.List(ctx, orgID, offset, limit)
}
func (module *module) Get(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error) {
return module.store.Get(ctx, orgID, id)
}
// CreateOrUpdate applies a batch of pricing rule changes:
// - ID set → match by id, overwrite fields.
// - SourceID set → match by source_id; if found overwrite, else insert.
// - neither set → insert a new user-created row (is_override = true).
//
// When UpdatableLLMPricingRule.IsOverride is nil AND the matched row has
// is_override = true, the row is fully preserved — only synced_at is stamped.
func (module *module) CreateOrUpdate(ctx context.Context, orgID valuer.UUID, userEmail string, rules []llmpricingruletypes.UpdatableLLMPricingRule) error {
now := time.Now()
err := module.store.RunInTx(ctx, func(ctx context.Context) error {
for _, u := range rules {
existing, err := module.findExisting(ctx, orgID, u)
if err != nil {
if !errors.Ast(err, errors.TypeNotFound) {
return err
}
if err := module.store.Create(ctx, llmpricingruletypes.NewLLMPricingRuleFromUpdatable(u, orgID, userEmail, now)); err != nil {
return err
}
continue
}
existing.Update(u, userEmail, now)
if err := module.store.Update(ctx, existing); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (module *module) Delete(ctx context.Context, orgID, id valuer.UUID) error {
if err := module.store.Delete(ctx, orgID, id); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
// findExisting returns the row matching the updatable's ID or SourceID.
// Returns a TypeNotFound error when neither matches; the caller treats that
// as "insert new".
func (module *module) findExisting(ctx context.Context, orgID valuer.UUID, u llmpricingruletypes.UpdatableLLMPricingRule) (*llmpricingruletypes.LLMPricingRule, error) {
switch {
case u.ID != nil:
return module.store.Get(ctx, orgID, *u.ID)
case u.SourceID != nil:
return module.store.GetBySourceID(ctx, orgID, *u.SourceID)
default:
return nil, errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "rule has neither id nor sourceId")
}
}

View File

@@ -0,0 +1,131 @@
package impllmpricingrule
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) llmpricingruletypes.Store {
return &store{sqlstore: sqlstore}
}
func (store *store) List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.LLMPricingRule, int, error) {
rules := make([]*llmpricingruletypes.LLMPricingRule, 0)
count, err := store.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(&rules).
Where("org_id = ?", orgID).
Order("created_at DESC").
Offset(offset).
Limit(limit).
ScanAndCount(ctx)
if err != nil {
return nil, 0, err
}
return rules, count, nil
}
func (store *store) Get(ctx context.Context, orgID, id valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error) {
rule := new(llmpricingruletypes.LLMPricingRule)
err := store.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(rule).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
return nil, store.sqlstore.WrapNotFoundErrf(err, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", id)
}
return rule, nil
}
func (store *store) GetBySourceID(ctx context.Context, orgID, sourceID valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error) {
rule := new(llmpricingruletypes.LLMPricingRule)
err := store.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(rule).
Where("org_id = ?", orgID).
Where("source_id = ?", sourceID).
Scan(ctx)
if err != nil {
return nil, store.sqlstore.WrapNotFoundErrf(err, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule with source_id %s not found", sourceID)
}
return rule, nil
}
func (store *store) Create(ctx context.Context, rule *llmpricingruletypes.LLMPricingRule) error {
_, err := store.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(rule).
Exec(ctx)
return err
}
func (store *store) Update(ctx context.Context, rule *llmpricingruletypes.LLMPricingRule) error {
res, err := store.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(rule).
Where("org_id = ?", rule.OrgID).
Where("id = ?", rule.ID).
ExcludeColumn("id", "org_id", "created_at", "created_by").
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", rule.ID)
}
return nil
}
func (store *store) Delete(ctx context.Context, orgID, id valuer.UUID) error {
res, err := store.sqlstore.
BunDBCtx(ctx).
NewDelete().
Model((*llmpricingruletypes.LLMPricingRule)(nil)).
Where("org_id = ?", orgID).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found", id)
}
return nil
}
func (s *store) RunInTx(ctx context.Context, cb func(ctx context.Context) error) error {
return s.sqlstore.RunInTxCtx(ctx, nil, cb)
}

View File

@@ -0,0 +1,31 @@
receivers:
otlp:
protocols:
grpc:
processors:
signozllmpricing:
attrs:
model: gen_ai.request.model
in: gen_ai.usage.input_tokens
out: gen_ai.usage.output_tokens
cache_read: gen_ai.usage.cache_read.input_tokens
cache_write: gen_ai.usage.cache_creation.input_tokens
default_pricing:
unit: per_million_tokens
rules: []
output_attrs:
in: _signoz.gen_ai.cost_input
out: _signoz.gen_ai.cost_output
cache_read: _signoz.gen_ai.cost_cache_read
cache_write: _signoz.gen_ai.cost_cache_write
total: _signoz.gen_ai.total_cost
batch: {}
exporters:
otlp:
endpoint: localhost:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, signozllmpricing]
exporters: [otlp]

View File

@@ -0,0 +1,35 @@
exporters:
otlp:
endpoint: localhost:4317
processors:
batch: {}
signozllmpricing:
attrs:
model: gen_ai.request.model
in: gen_ai.usage.input_tokens
out: gen_ai.usage.output_tokens
cache_read: gen_ai.usage.cache_read.input_tokens
cache_write: gen_ai.usage.cache_creation.input_tokens
default_pricing:
unit: per_million_tokens
rules: []
output_attrs:
in: _signoz.gen_ai.cost_input
out: _signoz.gen_ai.cost_output
cache_read: _signoz.gen_ai.cost_cache_read
cache_write: _signoz.gen_ai.cost_cache_write
total: _signoz.gen_ai.total_cost
receivers:
otlp:
protocols:
grpc: null
service:
pipelines:
traces:
exporters:
- otlp
processors:
- batch
- signozllmpricing
receivers:
- otlp

View File

@@ -0,0 +1,44 @@
exporters:
otlp:
endpoint: localhost:4317
processors:
batch: {}
signozllmpricing:
attrs:
model: gen_ai.request.model
in: gen_ai.usage.input_tokens
out: gen_ai.usage.output_tokens
cache_read: gen_ai.usage.cache_read.input_tokens
cache_write: gen_ai.usage.cache_creation.input_tokens
default_pricing:
unit: per_million_tokens
rules:
- name: gpt-4o
pattern:
- gpt-4o*
cache:
mode: subtract
read: 2.5
write: 0
in: 5
out: 15
output_attrs:
in: _signoz.gen_ai.cost_input
out: _signoz.gen_ai.cost_output
cache_read: _signoz.gen_ai.cost_cache_read
cache_write: _signoz.gen_ai.cost_cache_write
total: _signoz.gen_ai.total_cost
receivers:
otlp:
protocols:
grpc: null
service:
pipelines:
traces:
exporters:
- otlp
processors:
- batch
- signozllmpricing
receivers:
- otlp

View File

@@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/gorilla/handlers"
@@ -130,11 +131,14 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
opAmpModel.Init(signoz.SQLStore, signoz.Instrumentation.Logger(), signoz.Modules.OrgGetter)
llmCostFeature := impllmpricingrule.NewLLMCostFeature(signoz.Modules.LLMPricingRule)
agentConfMgr, err := agentConf.Initiate(
&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
llmCostFeature,
},
},
)

View File

@@ -124,6 +124,6 @@ func NewHandlers(
AlertmanagerHandler: signozalertmanager.NewHandler(alertmanagerService),
TraceDetail: impltracedetail.NewHandler(modules.TraceDetail),
RulerHandler: signozruler.NewHandler(rulerService),
LLMPricingRuleHandler: impllmpricingrule.NewHandler(nil, providerSettings),
LLMPricingRuleHandler: impllmpricingrule.NewHandler(modules.LLMPricingRule, providerSettings),
}
}

View File

@@ -17,6 +17,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring/implinframonitoring"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -80,6 +82,7 @@ type Modules struct {
CloudIntegration cloudintegration.Module
RuleStateHistory rulestatehistory.Module
TraceDetail tracedetail.Module
LLMPricingRule llmpricingrule.Module
}
func NewModules(
@@ -133,5 +136,6 @@ func NewModules(
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
CloudIntegration: cloudIntegrationModule,
TraceDetail: impltracedetail.NewModule(impltracedetail.NewTraceStore(telemetryStore), providerSettings, config.TraceDetail),
LLMPricingRule: impllmpricingrule.NewModule(impllmpricingrule.NewStore(sqlstore)),
}
}

View File

@@ -195,6 +195,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewServiceAccountAuthzactory(sqlstore),
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
sqlmigration.NewAddLLMPricingRulesFactory(sqlstore, sqlschema),
)
}

View File

@@ -0,0 +1,96 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addLLMPricingRules struct {
sqlschema sqlschema.SQLSchema
sqlstore sqlstore.SQLStore
}
func NewAddLLMPricingRulesFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_llm_pricing_rule"), func(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addLLMPricingRules{
sqlschema: sqlschema,
sqlstore: sqlstore,
}, nil
})
}
func (migration *addLLMPricingRules) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addLLMPricingRules) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
sqls := [][]byte{}
tableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
Name: "llm_pricing_rule",
Columns: []*sqlschema.Column{
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "source_id", DataType: sqlschema.DataTypeText, Nullable: true},
{Name: "model", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "provider", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "model_pattern", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "unit", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "pricing", DataType: sqlschema.DataTypeText, Nullable: false, Default: "'{}'"},
{Name: "is_override", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "false"},
{Name: "synced_at", DataType: sqlschema.DataTypeTimestamp, Nullable: true},
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
},
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{
ColumnNames: []sqlschema.ColumnName{"id"},
},
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
{
ReferencingColumnName: sqlschema.ColumnName("org_id"),
ReferencedTableName: sqlschema.TableName("organizations"),
ReferencedColumnName: sqlschema.ColumnName("id"),
},
},
})
sqls = append(sqls, tableSQLs...)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
// Partial unique index: one Zeus-synced rule per (org, source). User-created
// rules carry source_id = NULL and are intentionally excluded from the
// constraint (a single org may have many).
if _, err := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS llm_pricing_rule_org_source_unique ON llm_pricing_rule (org_id, source_id) WHERE source_id IS NOT NULL`); err != nil {
return err
}
return tx.Commit()
}
func (migration *addLLMPricingRules) Down(context.Context, *bun.DB) error {
return nil
}

View File

@@ -12,8 +12,10 @@ import (
)
var (
ErrCodePricingRuleNotFound = errors.MustNewCode("pricing_rule_not_found")
ErrCodePricingRuleInvalidInput = errors.MustNewCode("pricing_rule_invalid_input")
ErrCodePricingRuleNotFound = errors.MustNewCode("pricing_rule_not_found")
ErrCodePricingRuleInvalidInput = errors.MustNewCode("pricing_rule_invalid_input")
ErrCodeInvalidCollectorConfig = errors.MustNewCode("invalid_collector_config")
ErrCodeBuildPricingProcessorConf = errors.MustNewCode("build_pricing_processor_config")
)
type LLMPricingRuleUnit struct {
@@ -183,3 +185,48 @@ func NewGettableLLMPricingRulesFromLLMPricingRules(items []*LLMPricingRule, tota
Limit: limit,
}
}
func NewLLMPricingRuleFromUpdatable(u UpdatableLLMPricingRule, orgID valuer.UUID, userEmail string, now time.Time) *LLMPricingRule {
isOverride := true
if u.IsOverride != nil {
isOverride = *u.IsOverride
} else if u.SourceID != nil {
isOverride = false
}
return &LLMPricingRule{
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
UserAuditable: types.UserAuditable{CreatedBy: userEmail, UpdatedBy: userEmail},
OrgID: orgID,
SourceID: u.SourceID,
Model: u.Model,
Provider: u.Provider,
ModelPattern: StringSlice(u.ModelPattern),
Unit: u.Unit,
Pricing: u.Pricing,
IsOverride: isOverride,
SyncedAt: &now,
Enabled: u.Enabled,
}
}
func (r *LLMPricingRule) Update(u UpdatableLLMPricingRule, userEmail string, now time.Time) {
if u.IsOverride == nil && r.IsOverride {
r.SyncedAt = &now
return
}
r.Model = u.Model
r.Provider = u.Provider
r.ModelPattern = StringSlice(u.ModelPattern)
r.Unit = u.Unit
r.Pricing = u.Pricing
if u.IsOverride != nil {
r.IsOverride = *u.IsOverride
}
r.Enabled = u.Enabled
r.SyncedAt = &now
r.UpdatedAt = now
r.UpdatedBy = userEmail
}

View File

@@ -0,0 +1,49 @@
package llmpricingruletypes
// LLMPricingRuleProcessorConfig is the top-level config for the signozllmpricing
// OTel processor that gets deployed to collectors via OpAMP.
type LLMPricingRuleProcessorConfig struct {
Attrs LLMPricingRuleProcessorAttrs `yaml:"attrs" json:"attrs"`
DefaultPricing LLMPricingRuleProcessorDefaultPricing `yaml:"default_pricing" json:"default_pricing"`
OutputAttrs LLMPricingRuleProcessorOutputAttrs `yaml:"output_attrs" json:"output_attrs"`
}
// LLMPricingRuleProcessorAttrs maps span attribute names to the processor's input fields.
type LLMPricingRuleProcessorAttrs struct {
Model string `yaml:"model" json:"model"`
In string `yaml:"in" json:"in"`
Out string `yaml:"out" json:"out"`
CacheRead string `yaml:"cache_read" json:"cache_read"`
CacheWrite string `yaml:"cache_write" json:"cache_write"`
}
// LLMPricingRuleProcessorDefaultPricing holds the pricing unit and the list of model-specific rules.
type LLMPricingRuleProcessorDefaultPricing struct {
Unit string `yaml:"unit" json:"unit"`
Rules []LLMPricingRuleProcessor `yaml:"rules" json:"rules"`
}
// LLMPricingRuleProcessor is a single pricing rule inside the processor config.
type LLMPricingRuleProcessor struct {
Name string `yaml:"name" json:"name"`
Pattern []string `yaml:"pattern" json:"pattern"`
Cache LLMPricingRuleProcessorCache `yaml:"cache" json:"cache"`
In float64 `yaml:"in" json:"in"`
Out float64 `yaml:"out" json:"out"`
}
// LLMPricingRuleProcessorCache describes how cached tokens are accounted for.
type LLMPricingRuleProcessorCache struct {
Mode string `yaml:"mode" json:"mode"`
Read float64 `yaml:"read" json:"read"`
Write float64 `yaml:"write" json:"write"`
}
// LLMPricingRuleProcessorOutputAttrs maps the processor's computed cost fields to span attribute names.
type LLMPricingRuleProcessorOutputAttrs struct {
In string `yaml:"in" json:"in"`
Out string `yaml:"out" json:"out"`
CacheRead string `yaml:"cache_read" json:"cache_read"`
CacheWrite string `yaml:"cache_write" json:"cache_write"`
Total string `yaml:"total" json:"total"`
}

View File

@@ -7,10 +7,11 @@ import (
)
type Store interface {
List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*StorableLLMPricingRule, int, error)
Get(ctx context.Context, orgID, id valuer.UUID) (*StorableLLMPricingRule, error)
GetBySourceID(ctx context.Context, orgID, sourceID valuer.UUID) (*StorableLLMPricingRule, error)
Create(ctx context.Context, rule *StorableLLMPricingRule) error
Update(ctx context.Context, rule *StorableLLMPricingRule) error
List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*LLMPricingRule, int, error)
Get(ctx context.Context, orgID, id valuer.UUID) (*LLMPricingRule, error)
GetBySourceID(ctx context.Context, orgID, sourceID valuer.UUID) (*LLMPricingRule, error)
Create(ctx context.Context, rule *LLMPricingRule) error
Update(ctx context.Context, rule *LLMPricingRule) error
Delete(ctx context.Context, orgID, id valuer.UUID) error
RunInTx(ctx context.Context, cb func(ctx context.Context) error) error
}